Skip to main content

missiond_core/core/
mission_control.rs

1//! Mission Control - Main coordinator
2//!
3//! Unified management of task queue, slot configuration, agent processes, and inbox.
4
5use super::{
6    Inbox, PermissionConfig, PermissionPolicy, PermissionRule, ProcessManager, SlotManager,
7    SpawnOptions,
8};
9use crate::db::MissionDB;
10use crate::types::{
11    CreateTaskInput, EventType, InboxMessage, Slot, SlotsConfig, Task, TaskStatus, TaskUpdate,
12};
13use anyhow::{anyhow, Result};
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use tokio::sync::RwLock;
17use tracing::{error, info};
18use uuid::Uuid;
19
20/// Execution mode
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ExecutionMode {
23    /// Batch mode: run claude -p
24    Batch,
25    /// PTY mode: interactive terminal
26    Pty,
27}
28
29impl ExecutionMode {
30    pub fn as_str(&self) -> &'static str {
31        match self {
32            ExecutionMode::Batch => "batch",
33            ExecutionMode::Pty => "pty",
34        }
35    }
36}
37
38/// Options for creating MissionControl
39pub struct MissionControlOptions {
40    /// Database path
41    pub db_path: PathBuf,
42    /// Slots configuration file path
43    pub slots_config_path: PathBuf,
44    /// Permission configuration file path (optional)
45    pub permission_config_path: Option<PathBuf>,
46    /// Logs directory (optional)
47    pub logs_dir: Option<PathBuf>,
48    /// Default execution mode
49    pub default_mode: Option<ExecutionMode>,
50}
51
52/// Mission Control
53///
54/// Main coordinator for task queue, slot configuration, agent processes, and inbox.
55pub struct MissionControl {
56    db: Arc<MissionDB>,
57    slot_manager: SlotManager,
58    process_manager: ProcessManager,
59    permission_policy: PermissionPolicy,
60    inbox: Inbox,
61    started: RwLock<bool>,
62    logs_dir: PathBuf,
63    default_mode: RwLock<ExecutionMode>,
64}
65
66impl MissionControl {
67    /// Create a new MissionControl
68    pub fn new(options: MissionControlOptions) -> Result<Self> {
69        // Initialize database
70        let db = Arc::new(MissionDB::open(&options.db_path)?);
71
72        // Logs directory
73        let logs_dir = options
74            .logs_dir
75            .unwrap_or_else(|| options.db_path.parent().unwrap().join("logs"));
76
77        let default_mode = options.default_mode.unwrap_or(ExecutionMode::Batch);
78
79        // Initialize components
80        let slot_manager = SlotManager::new(Arc::clone(&db));
81        let process_manager = ProcessManager::new(Arc::clone(&db), logs_dir.clone());
82        let inbox = Inbox::new(Arc::clone(&db));
83
84        // Load permission config
85        let permission_config_path = options.permission_config_path.unwrap_or_else(|| {
86            options
87                .db_path
88                .parent()
89                .unwrap()
90                .join("config")
91                .join("permissions.yaml")
92        });
93        let permission_policy = PermissionPolicy::new(&permission_config_path);
94
95        // Load slots config
96        let mc = Self {
97            db,
98            slot_manager,
99            process_manager,
100            permission_policy,
101            inbox,
102            started: RwLock::new(false),
103            logs_dir,
104            default_mode: RwLock::new(default_mode),
105        };
106
107        mc.load_slots_config(&options.slots_config_path)?;
108
109        info!("MissionControl initialized");
110        Ok(mc)
111    }
112
113    /// Load slots configuration
114    fn load_slots_config(&self, config_path: &Path) -> Result<()> {
115        let content = std::fs::read_to_string(config_path)?;
116        let config: SlotsConfig = serde_yaml::from_str(&content)?;
117
118        // Load into SlotManager
119        self.slot_manager.load_slots(config.slots.clone());
120
121        // Initialize process state
122        for slot_config in &config.slots {
123            if let Some(slot) = self.slot_manager.get_slot(&slot_config.id) {
124                self.process_manager.init_slot(&slot);
125            }
126        }
127
128        info!(count = config.slots.len(), "Slots loaded");
129        Ok(())
130    }
131
132    /// Start the service
133    pub async fn start(&self) -> Result<()> {
134        let mut started = self.started.write().await;
135        if *started {
136            return Ok(());
137        }
138        *started = true;
139
140        info!("MissionControl started");
141        Ok(())
142    }
143
144    /// Stop the service
145    pub async fn stop(&self) -> Result<()> {
146        let mut started = self.started.write().await;
147        if !*started {
148            return Ok(());
149        }
150        *started = false;
151
152        // Shutdown all processes
153        self.process_manager.shutdown().await;
154
155        info!("MissionControl stopped");
156        Ok(())
157    }
158
159    // ============ Task Operations ============
160
161    /// Submit a task (async, returns immediately)
162    pub fn submit(&self, role: &str, prompt: &str) -> Result<String> {
163        let input = CreateTaskInput {
164            role: role.to_string(),
165            prompt: prompt.to_string(),
166        };
167        let task = self.create_task(input)?;
168        Ok(task.id)
169    }
170
171    /// Synchronous ask (submit + wait)
172    pub async fn ask_expert(
173        &self,
174        role: &str,
175        question: &str,
176        _timeout_ms: u64,
177    ) -> Result<String> {
178        let input = CreateTaskInput {
179            role: role.to_string(),
180            prompt: question.to_string(),
181        };
182        let task = self.create_task(input)?;
183
184        // Synchronous execution
185        self.process_task(&task).await
186    }
187
188    /// Create a task
189    fn create_task(&self, input: CreateTaskInput) -> Result<Task> {
190        let now = chrono::Utc::now().timestamp_millis();
191        let task = Task {
192            id: Uuid::new_v4().to_string(),
193            role: input.role.clone(),
194            prompt: input.prompt.clone(),
195            status: TaskStatus::Queued,
196            slot_id: None,
197            session_id: None,
198            result: None,
199            error: None,
200            created_at: now,
201            started_at: None,
202            finished_at: None,
203        };
204
205        let _ = self.db.insert_task(&task);
206        let data = serde_json::json!({ "role": input.role });
207        let _ = self.db.insert_event(&task.id, EventType::TaskCreated, Some(&data), now);
208
209        info!(task_id = %task.id, role = %input.role, "Task created");
210        Ok(task)
211    }
212
213    /// Process a task
214    async fn process_task(&self, task: &Task) -> Result<String> {
215        // Find slots for the role
216        let slots = self.slot_manager.get_slots_by_role(&task.role);
217        if slots.is_empty() {
218            return Err(anyhow!("No slot found for role: {}", task.role));
219        }
220
221        // Find an available slot (idle)
222        let mut target_slot: Option<Slot> = None;
223        for slot in &slots {
224            if self.process_manager.is_available(&slot.config.id) {
225                target_slot = Some(slot.clone());
226                break;
227            }
228        }
229
230        // If no available slot, try to spawn one
231        if target_slot.is_none() {
232            for slot in &slots {
233                if let Some(status) = self.process_manager.get_status(&slot.config.id) {
234                    if status.status == super::AgentStatus::Stopped {
235                        self.process_manager
236                            .spawn(slot, SpawnOptions::default())
237                            .await?;
238                        target_slot = Some(slot.clone());
239                        break;
240                    }
241                }
242            }
243        }
244
245        let target_slot =
246            target_slot.ok_or_else(|| anyhow!("No available slot for role: {}", task.role))?;
247
248        let now = chrono::Utc::now().timestamp_millis();
249
250        // Update task status
251        let _ = self.db.update_task(
252            &task.id,
253            &TaskUpdate {
254                status: Some(TaskStatus::Running),
255                slot_id: Some(target_slot.config.id.clone()),
256                started_at: Some(now),
257                ..Default::default()
258            },
259        );
260
261        let data = serde_json::json!({ "slotId": target_slot.config.id });
262        let _ = self.db.insert_event(&task.id, EventType::TaskStarted, Some(&data), now);
263
264        info!(task_id = %task.id, slot_id = %target_slot.config.id, "Task started");
265
266        // Execute task
267        match self.process_manager.execute_task(&target_slot, task).await {
268            Ok(result) => {
269                let now = chrono::Utc::now().timestamp_millis();
270
271                // Update task status
272                let _ = self.db.update_task(
273                    &task.id,
274                    &TaskUpdate {
275                        status: Some(TaskStatus::Done),
276                        session_id: Some(result.session_id.clone()),
277                        result: Some(result.result.clone()),
278                        finished_at: Some(now),
279                        ..Default::default()
280                    },
281                );
282
283                let data = serde_json::json!({ "resultLength": result.result.len() });
284                let _ = self.db.insert_event(&task.id, EventType::TaskDone, Some(&data), now);
285
286                // Add to inbox
287                self.inbox.add_message(&task.id, &task.role, &result.result);
288
289                info!(task_id = %task.id, "Task completed");
290                Ok(result.result)
291            }
292            Err(e) => {
293                let error_msg = e.to_string();
294                let now = chrono::Utc::now().timestamp_millis();
295
296                let _ = self.db.update_task(
297                    &task.id,
298                    &TaskUpdate {
299                        status: Some(TaskStatus::Failed),
300                        error: Some(error_msg.clone()),
301                        finished_at: Some(now),
302                        ..Default::default()
303                    },
304                );
305
306                let data = serde_json::json!({ "error": error_msg });
307                let _ = self.db.insert_event(&task.id, EventType::TaskFailed, Some(&data), now);
308
309                error!(task_id = %task.id, error = %error_msg, "Task failed");
310                Err(e)
311            }
312        }
313    }
314
315    /// Get task status
316    pub fn get_status(&self, task_id: &str) -> Option<Task> {
317        self.db.get_task(task_id).ok().flatten()
318    }
319
320    /// Cancel a task
321    pub async fn cancel(&self, task_id: &str) -> Result<bool> {
322        let task = match self.db.get_task(task_id).ok().flatten() {
323            Some(t) => t,
324            None => return Ok(false),
325        };
326
327        let now = chrono::Utc::now().timestamp_millis();
328
329        if task.status == TaskStatus::Queued {
330            let _ = self.db.update_task(
331                task_id,
332                &TaskUpdate {
333                    status: Some(TaskStatus::Cancelled),
334                    finished_at: Some(now),
335                    ..Default::default()
336                },
337            );
338            return Ok(true);
339        }
340
341        if task.status == TaskStatus::Running {
342            if let Some(slot_id) = &task.slot_id {
343                self.process_manager.kill(slot_id).await?;
344                let _ = self.db.update_task(
345                    task_id,
346                    &TaskUpdate {
347                        status: Some(TaskStatus::Cancelled),
348                        finished_at: Some(now),
349                        ..Default::default()
350                    },
351                );
352                return Ok(true);
353            }
354        }
355
356        Ok(false)
357    }
358
359    // ============ Process Control ============
360
361    /// Spawn an agent process
362    pub async fn spawn_agent(
363        &self,
364        slot_id: &str,
365        options: Option<SpawnOptions>,
366    ) -> Result<super::AgentProcess> {
367        let slot = self
368            .slot_manager
369            .get_slot(slot_id)
370            .ok_or_else(|| anyhow!("Slot not found: {}", slot_id))?;
371        self.process_manager
372            .spawn(&slot, options.unwrap_or_default())
373            .await
374    }
375
376    /// Kill an agent process
377    pub async fn kill_agent(&self, slot_id: &str) -> Result<()> {
378        self.process_manager.kill(slot_id).await
379    }
380
381    /// Restart an agent process
382    pub async fn restart_agent(
383        &self,
384        slot_id: &str,
385        options: Option<SpawnOptions>,
386    ) -> Result<super::AgentProcess> {
387        let slot = self
388            .slot_manager
389            .get_slot(slot_id)
390            .ok_or_else(|| anyhow!("Slot not found: {}", slot_id))?;
391        self.process_manager
392            .restart(&slot, options.unwrap_or_default())
393            .await
394    }
395
396    /// Get all agent statuses
397    pub fn get_agents(&self) -> Vec<super::AgentProcess> {
398        self.process_manager.get_all_status()
399    }
400
401    /// Get a specific agent's status
402    pub fn get_agent(&self, slot_id: &str) -> Option<super::AgentProcess> {
403        self.process_manager.get_status(slot_id)
404    }
405
406    // ============ Inbox Operations ============
407
408    /// Get inbox messages
409    pub fn get_inbox(&self, unread_only: bool, limit: usize) -> Vec<InboxMessage> {
410        self.inbox.get_messages(unread_only, limit)
411    }
412
413    /// Mark a message as read
414    pub fn mark_inbox_read(&self, message_id: &str) {
415        self.inbox.mark_read(message_id);
416    }
417
418    // ============ Slot Operations ============
419
420    /// List all slots
421    pub fn list_slots(&self) -> Vec<Slot> {
422        self.slot_manager.get_all_slots()
423    }
424
425    /// Reset a slot's session
426    pub fn reset_slot_session(&self, slot_id: &str) {
427        self.slot_manager.reset_session(slot_id);
428    }
429
430    // ============ Statistics ============
431
432    /// Get statistics
433    pub fn get_stats(&self) -> MissionStats {
434        let process_stats = self.process_manager.get_stats();
435        let slot_stats = self.slot_manager.get_stats();
436
437        MissionStats {
438            tasks: TaskStats {
439                queued: self
440                    .db
441                    .get_tasks_by_status(TaskStatus::Queued)
442                    .map(|v| v.len())
443                    .unwrap_or(0),
444                running: self
445                    .db
446                    .get_tasks_by_status(TaskStatus::Running)
447                    .map(|v| v.len())
448                    .unwrap_or(0),
449                done: self
450                    .db
451                    .get_tasks_by_status(TaskStatus::Done)
452                    .map(|v| v.len())
453                    .unwrap_or(0),
454                failed: self
455                    .db
456                    .get_tasks_by_status(TaskStatus::Failed)
457                    .map(|v| v.len())
458                    .unwrap_or(0),
459            },
460            agents: AgentStats {
461                total: process_stats.total,
462                stopped: process_stats.stopped,
463                idle: process_stats.idle,
464                busy: process_stats.busy,
465            },
466            slots: SlotStats {
467                total: slot_stats.total,
468                by_role: slot_stats.by_role,
469            },
470            inbox: InboxStats {
471                unread: self.inbox.get_unread_count(),
472            },
473        }
474    }
475
476    /// Get default execution mode
477    pub async fn get_default_mode(&self) -> ExecutionMode {
478        *self.default_mode.read().await
479    }
480
481    /// Set default execution mode
482    pub async fn set_default_mode(&self, mode: ExecutionMode) {
483        *self.default_mode.write().await = mode;
484        info!(mode = %mode.as_str(), "Default execution mode changed");
485    }
486
487    // ============ Permission Management ============
488
489    /// Get permission configuration
490    pub fn get_permission_config(&self) -> PermissionConfig {
491        self.permission_policy.get_config()
492    }
493
494    /// Set role permission rule
495    pub fn set_role_permission(&self, role: &str, rule: PermissionRule) {
496        self.permission_policy.set_role_rule(role, rule);
497        info!(role = %role, "Role permission updated");
498    }
499
500    /// Set slot permission rule
501    pub fn set_slot_permission(&self, slot_id: &str, rule: PermissionRule) {
502        self.permission_policy.set_slot_rule(slot_id, rule);
503        info!(slot_id = %slot_id, "Slot permission updated");
504    }
505
506    /// Add role auto_allow
507    pub fn add_role_auto_allow(&self, role: &str, pattern: &str) {
508        self.permission_policy.add_role_auto_allow(role, pattern);
509        info!(role = %role, pattern = %pattern, "Added role auto_allow");
510    }
511
512    /// Add slot auto_allow
513    pub fn add_slot_auto_allow(&self, slot_id: &str, pattern: &str) {
514        self.permission_policy.add_slot_auto_allow(slot_id, pattern);
515        info!(slot_id = %slot_id, pattern = %pattern, "Added slot auto_allow");
516    }
517
518    /// Reload permission configuration
519    pub fn reload_permission_config(&self) {
520        self.permission_policy.reload();
521        info!("Permission config reloaded");
522    }
523
524    /// Check tool permission
525    pub fn check_permission(
526        &self,
527        slot_id: &str,
528        role: &str,
529        tool_name: &str,
530    ) -> super::PermissionDecision {
531        self.permission_policy
532            .check_permission(slot_id, role, tool_name)
533    }
534}
535
536/// Task statistics
537#[derive(Debug, Clone)]
538pub struct TaskStats {
539    pub queued: usize,
540    pub running: usize,
541    pub done: usize,
542    pub failed: usize,
543}
544
545/// Agent statistics
546#[derive(Debug, Clone)]
547pub struct AgentStats {
548    pub total: usize,
549    pub stopped: usize,
550    pub idle: usize,
551    pub busy: usize,
552}
553
554/// Slot statistics
555#[derive(Debug, Clone)]
556pub struct SlotStats {
557    pub total: usize,
558    pub by_role: std::collections::HashMap<String, usize>,
559}
560
561/// Inbox statistics
562#[derive(Debug, Clone)]
563pub struct InboxStats {
564    pub unread: usize,
565}
566
567/// Mission statistics
568#[derive(Debug, Clone)]
569pub struct MissionStats {
570    pub tasks: TaskStats,
571    pub agents: AgentStats,
572    pub slots: SlotStats,
573    pub inbox: InboxStats,
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use tempfile::tempdir;
580
581    fn create_test_config(dir: &Path) -> (PathBuf, PathBuf) {
582        let db_path = dir.join("mission.db");
583        let slots_config_path = dir.join("slots.yaml");
584
585        // Create slots config
586        let slots_config = r#"
587slots:
588  - id: slot-1
589    role: worker
590    description: Test worker slot
591  - id: slot-2
592    role: specialist
593    description: Test specialist slot
594"#;
595        std::fs::write(&slots_config_path, slots_config).unwrap();
596
597        (db_path, slots_config_path)
598    }
599
600    #[tokio::test]
601    async fn test_create_mission_control() {
602        let dir = tempdir().unwrap();
603        let (db_path, slots_config_path) = create_test_config(dir.path());
604
605        let mc = MissionControl::new(MissionControlOptions {
606            db_path,
607            slots_config_path,
608            permission_config_path: None,
609            logs_dir: None,
610            default_mode: None,
611        })
612        .unwrap();
613
614        let slots = mc.list_slots();
615        assert_eq!(slots.len(), 2);
616    }
617
618    #[tokio::test]
619    async fn test_stats() {
620        let dir = tempdir().unwrap();
621        let (db_path, slots_config_path) = create_test_config(dir.path());
622
623        let mc = MissionControl::new(MissionControlOptions {
624            db_path,
625            slots_config_path,
626            permission_config_path: None,
627            logs_dir: None,
628            default_mode: None,
629        })
630        .unwrap();
631
632        let stats = mc.get_stats();
633        assert_eq!(stats.slots.total, 2);
634        assert_eq!(stats.agents.total, 2);
635        assert_eq!(stats.agents.stopped, 2);
636    }
637
638    #[tokio::test]
639    async fn test_default_mode() {
640        let dir = tempdir().unwrap();
641        let (db_path, slots_config_path) = create_test_config(dir.path());
642
643        let mc = MissionControl::new(MissionControlOptions {
644            db_path,
645            slots_config_path,
646            permission_config_path: None,
647            logs_dir: None,
648            default_mode: Some(ExecutionMode::Pty),
649        })
650        .unwrap();
651
652        assert_eq!(mc.get_default_mode().await, ExecutionMode::Pty);
653
654        mc.set_default_mode(ExecutionMode::Batch).await;
655        assert_eq!(mc.get_default_mode().await, ExecutionMode::Batch);
656    }
657}