Skip to main content

missiond_core/pty/
manager.rs

1//! PTY Manager - Manages multiple PTY sessions
2//!
3//! Unlike simple process management, PTYManager maintains persistent
4//! interactive sessions for Claude Code agents.
5
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::{anyhow, Result};
12use chrono::Utc;
13use serde::{Deserialize, Serialize};
14use tokio::sync::{broadcast, RwLock};
15use tracing::{debug, error, info};
16
17use super::session::{
18    ConfirmInfo, ConfirmResponse, PTYSession, PTYSessionOptions, PermissionDecision, SessionEvent,
19    SessionState,
20};
21
22// ========== Types ==========
23
24/// Information about a PTY agent
25#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(rename_all = "camelCase")]
27pub struct PTYAgentInfo {
28    pub slot_id: String,
29    pub role: String,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub pid: Option<u32>,
32    pub state: SessionState,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub started_at: Option<i64>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub current_task_id: Option<String>,
37    pub log_file: PathBuf,
38}
39
40/// Options for spawning a PTY session
41#[derive(Debug, Clone, Default)]
42pub struct PTYSpawnOptions {
43    /// Automatically restart on crash
44    pub auto_restart: bool,
45}
46
47/// Result of executing a message
48#[derive(Debug, Clone, Serialize)]
49pub struct PTYExecuteResult {
50    pub response: String,
51    pub duration_ms: u64,
52}
53
54/// Slot configuration
55#[derive(Debug, Clone)]
56pub struct Slot {
57    pub id: String,
58    pub role: String,
59    pub cwd: Option<PathBuf>,
60}
61
62/// Permission policy trait
63pub trait PermissionPolicy: Send + Sync {
64    fn check_permission(&self, slot_id: &str, role: &str, tool_name: &str) -> PermissionDecision;
65}
66
67// ========== PTYManager ==========
68
69/// Manager for multiple PTY sessions
70///
71/// Handles lifecycle, message routing, and event aggregation
72/// for multiple Claude Code PTY sessions.
73pub struct PTYManager {
74    /// Active sessions by slot ID
75    sessions: Arc<RwLock<HashMap<String, Arc<RwLock<PTYSession>>>>>,
76    /// Agent info by slot ID
77    agent_info: Arc<RwLock<HashMap<String, PTYAgentInfo>>>,
78    /// Directory for log files
79    logs_dir: PathBuf,
80    /// Slots configured for auto-restart
81    auto_restart_slots: Arc<RwLock<std::collections::HashSet<String>>>,
82    /// Permission policy
83    permission_policy: Arc<RwLock<Option<Arc<dyn PermissionPolicy>>>>,
84    /// Aggregated event channel
85    event_tx: broadcast::Sender<ManagerEvent>,
86}
87
88/// Events from the manager
89#[derive(Debug, Clone)]
90pub enum ManagerEvent {
91    /// Session spawned
92    Spawned { slot_id: String },
93    /// State changed
94    StateChange {
95        slot_id: String,
96        new_state: SessionState,
97        prev_state: SessionState,
98    },
99    /// Confirmation required
100    ConfirmRequired {
101        slot_id: String,
102        prompt: String,
103        tool_info: Option<ConfirmInfo>,
104    },
105    /// Session exited
106    Exited { slot_id: String, exit_code: i32 },
107}
108
109impl PTYManager {
110    /// Create a new PTY manager
111    pub fn new(logs_dir: PathBuf) -> Self {
112        // Ensure logs directory exists
113        if !logs_dir.exists() {
114            std::fs::create_dir_all(&logs_dir).ok();
115        }
116
117        let (event_tx, _) = broadcast::channel(1000);
118
119        Self {
120            sessions: Arc::new(RwLock::new(HashMap::new())),
121            agent_info: Arc::new(RwLock::new(HashMap::new())),
122            logs_dir,
123            auto_restart_slots: Arc::new(RwLock::new(std::collections::HashSet::new())),
124            permission_policy: Arc::new(RwLock::new(None)),
125            event_tx,
126        }
127    }
128
129    /// Set permission policy
130    pub async fn set_permission_policy(&self, policy: Arc<dyn PermissionPolicy>) {
131        *self.permission_policy.write().await = Some(policy);
132        info!("Permission policy set");
133    }
134
135    /// Initialize a slot
136    pub async fn init_slot(&self, slot: &Slot) {
137        let log_file = self.logs_dir.join(format!("pty-{}.log", slot.id));
138
139        let info = PTYAgentInfo {
140            slot_id: slot.id.clone(),
141            role: slot.role.clone(),
142            pid: None,
143            state: SessionState::Exited,
144            started_at: None,
145            current_task_id: None,
146            log_file,
147        };
148
149        self.agent_info.write().await.insert(slot.id.clone(), info);
150        debug!(slot_id = %slot.id, role = %slot.role, "PTY slot initialized");
151    }
152
153    /// Spawn a PTY session for a slot
154    pub async fn spawn(&self, slot: &Slot, options: PTYSpawnOptions) -> Result<PTYAgentInfo> {
155        let info = {
156            let agent_info = self.agent_info.read().await;
157            agent_info
158                .get(&slot.id)
159                .cloned()
160                .ok_or_else(|| anyhow!("Slot not initialized: {}", slot.id))?
161        };
162
163        // Check for existing running session
164        {
165            let sessions = self.sessions.read().await;
166            if let Some(session) = sessions.get(&slot.id) {
167                let session = session.read().await;
168                if session.is_running() {
169                    return Err(anyhow!("PTY session already running: {}", slot.id));
170                }
171            }
172        }
173
174        // Track auto-restart
175        if options.auto_restart {
176            self.auto_restart_slots.write().await.insert(slot.id.clone());
177        }
178
179        // Create session
180        let cwd = slot.cwd.clone().unwrap_or_else(|| {
181            std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"))
182        });
183
184        let mut session = PTYSession::new(PTYSessionOptions {
185            slot_id: slot.id.clone(),
186            cwd,
187            env: None,
188            log_file: Some(info.log_file.clone()),
189            cols: 120,
190            rows: 30,
191        })?;
192
193        // Set up permission check
194        let policy = self.permission_policy.read().await.clone();
195        let slot_id = slot.id.clone();
196        let role = slot.role.clone();
197        if let Some(policy) = policy {
198            session
199                .set_permission_check(move |confirm_info: &ConfirmInfo| {
200                    let tool_name = confirm_info
201                        .tool
202                        .as_ref()
203                        .map(|t| t.name.as_str())
204                        .unwrap_or("");
205                    policy.check_permission(&slot_id, &role, tool_name)
206                })
207                .await;
208        }
209
210        // Set up event forwarding
211        let event_tx = self.event_tx.clone();
212        let slot_id_for_events = slot.id.clone();
213        let mut session_rx = session.subscribe();
214
215        tokio::spawn(async move {
216            while let Ok(event) = session_rx.recv().await {
217                match event {
218                    SessionEvent::StateChange {
219                        new_state,
220                        prev_state,
221                    } => {
222                        let _ = event_tx.send(ManagerEvent::StateChange {
223                            slot_id: slot_id_for_events.clone(),
224                            new_state,
225                            prev_state,
226                        });
227                    }
228                    SessionEvent::ConfirmRequired { prompt, info } => {
229                        let _ = event_tx.send(ManagerEvent::ConfirmRequired {
230                            slot_id: slot_id_for_events.clone(),
231                            prompt,
232                            tool_info: info,
233                        });
234                    }
235                    SessionEvent::Exit(code) => {
236                        let _ = event_tx.send(ManagerEvent::Exited {
237                            slot_id: slot_id_for_events.clone(),
238                            exit_code: code,
239                        });
240                        break;
241                    }
242                    _ => {}
243                }
244            }
245        });
246
247        // Start session
248        session.start().await?;
249
250        let pid = session.pid().await;
251        let state = session.state().await;
252
253        // Update agent info
254        {
255            let mut agent_info = self.agent_info.write().await;
256            if let Some(info) = agent_info.get_mut(&slot.id) {
257                info.pid = pid;
258                info.state = state;
259                info.started_at = Some(Utc::now().timestamp_millis());
260            }
261        }
262
263        // Store session
264        {
265            let mut sessions = self.sessions.write().await;
266            sessions.insert(slot.id.clone(), Arc::new(RwLock::new(session)));
267        }
268
269        info!(slot_id = %slot.id, pid = ?pid, "PTY session started");
270        let _ = self.event_tx.send(ManagerEvent::Spawned {
271            slot_id: slot.id.clone(),
272        });
273
274        // Set up auto-restart handler
275        let manager_sessions = Arc::clone(&self.sessions);
276        let manager_info = Arc::clone(&self.agent_info);
277        let manager_auto_restart = Arc::clone(&self.auto_restart_slots);
278        let manager_policy = Arc::clone(&self.permission_policy);
279        let manager_event_tx = self.event_tx.clone();
280        let slot_for_restart = slot.clone();
281
282        tokio::spawn(async move {
283            // Wait for session to exit
284            let mut interval = tokio::time::interval(Duration::from_secs(1));
285            loop {
286                interval.tick().await;
287
288                let should_restart = {
289                    let sessions = manager_sessions.read().await;
290                    if let Some(session) = sessions.get(&slot_for_restart.id) {
291                        let session = session.read().await;
292                        !session.is_running()
293                    } else {
294                        false
295                    }
296                };
297
298                if should_restart {
299                    // Check if auto-restart is enabled
300                    let auto_restart = manager_auto_restart
301                        .read()
302                        .await
303                        .contains(&slot_for_restart.id);
304
305                    if auto_restart {
306                        info!(slot_id = %slot_for_restart.id, "Auto-restarting PTY session");
307
308                        // Create new session
309                        let cwd = slot_for_restart.cwd.clone().unwrap_or_else(|| {
310                            std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"))
311                        });
312
313                        let log_file = {
314                            let info = manager_info.read().await;
315                            info.get(&slot_for_restart.id)
316                                .map(|i| i.log_file.clone())
317                        };
318
319                        if let Ok(mut new_session) = PTYSession::new(PTYSessionOptions {
320                            slot_id: slot_for_restart.id.clone(),
321                            cwd,
322                            env: None,
323                            log_file,
324                            cols: 120,
325                            rows: 30,
326                        }) {
327                            // Set up permission check
328                            let policy = manager_policy.read().await.clone();
329                            let slot_id = slot_for_restart.id.clone();
330                            let role = slot_for_restart.role.clone();
331                            if let Some(policy) = policy {
332                                new_session
333                                    .set_permission_check(move |confirm_info: &ConfirmInfo| {
334                                        let tool_name = confirm_info
335                                            .tool
336                                            .as_ref()
337                                            .map(|t| t.name.as_str())
338                                            .unwrap_or("");
339                                        policy.check_permission(&slot_id, &role, tool_name)
340                                    })
341                                    .await;
342                            }
343
344                            if new_session.start().await.is_ok() {
345                                let pid = new_session.pid().await;
346                                let state = new_session.state().await;
347
348                                // Update info
349                                {
350                                    let mut info = manager_info.write().await;
351                                    if let Some(agent_info) = info.get_mut(&slot_for_restart.id) {
352                                        agent_info.pid = pid;
353                                        agent_info.state = state;
354                                        agent_info.started_at =
355                                            Some(Utc::now().timestamp_millis());
356                                    }
357                                }
358
359                                // Store session
360                                {
361                                    let mut sessions = manager_sessions.write().await;
362                                    sessions.insert(
363                                        slot_for_restart.id.clone(),
364                                        Arc::new(RwLock::new(new_session)),
365                                    );
366                                }
367
368                                let _ = manager_event_tx.send(ManagerEvent::Spawned {
369                                    slot_id: slot_for_restart.id.clone(),
370                                });
371
372                                info!(slot_id = %slot_for_restart.id, "Auto-restart successful");
373                            } else {
374                                error!(slot_id = %slot_for_restart.id, "Auto-restart failed");
375                            }
376                        }
377                    }
378
379                    break;
380                }
381            }
382        });
383
384        // Return current info
385        let agent_info = self.agent_info.read().await;
386        Ok(agent_info.get(&slot.id).cloned().unwrap())
387    }
388
389    /// Send message and wait for response
390    pub async fn send(
391        &self,
392        slot_id: &str,
393        message: &str,
394        timeout_ms: u64,
395    ) -> Result<PTYExecuteResult> {
396        let session = {
397            let sessions = self.sessions.read().await;
398            sessions
399                .get(slot_id)
400                .cloned()
401                .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
402        };
403
404        // Update state to thinking
405        {
406            let mut agent_info = self.agent_info.write().await;
407            if let Some(info) = agent_info.get_mut(slot_id) {
408                info.state = SessionState::Thinking;
409            }
410        }
411
412        let start = std::time::Instant::now();
413
414        let response = {
415            let session = session.read().await;
416            session.send(message, timeout_ms).await?
417        };
418
419        let duration_ms = start.elapsed().as_millis() as u64;
420
421        info!(
422            slot_id = slot_id,
423            message_len = message.len(),
424            response_len = response.len(),
425            duration_ms = duration_ms,
426            "Message sent and response received"
427        );
428
429        Ok(PTYExecuteResult {
430            response,
431            duration_ms,
432        })
433    }
434
435    /// Subscribe to session events (raw data/state/exit/etc.)
436    pub async fn subscribe_session(
437        &self,
438        slot_id: &str,
439    ) -> Result<broadcast::Receiver<SessionEvent>> {
440        let session = {
441            let sessions = self.sessions.read().await;
442            sessions
443                .get(slot_id)
444                .cloned()
445                .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
446        };
447
448        let session = session.read().await;
449        Ok(session.subscribe())
450    }
451
452    /// Execute a task
453    pub async fn execute_task(
454        &self,
455        slot: &Slot,
456        task_id: &str,
457        prompt: &str,
458    ) -> Result<PTYExecuteResult> {
459        // Set current task
460        {
461            let mut agent_info = self.agent_info.write().await;
462            if let Some(info) = agent_info.get_mut(&slot.id) {
463                info.current_task_id = Some(task_id.to_string());
464            }
465        }
466
467        let result = self.send(&slot.id, prompt, 300_000).await;
468
469        // Clear current task
470        {
471            let mut agent_info = self.agent_info.write().await;
472            if let Some(info) = agent_info.get_mut(&slot.id) {
473                info.current_task_id = None;
474            }
475        }
476
477        result
478    }
479
480    /// Send confirmation response
481    pub async fn confirm(&self, slot_id: &str, response: ConfirmResponse) -> Result<()> {
482        let session = {
483            let sessions = self.sessions.read().await;
484            sessions
485                .get(slot_id)
486                .cloned()
487                .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
488        };
489
490        let session = session.read().await;
491        session.confirm(response).await
492    }
493
494    /// Write raw input to the PTY (no state checks)
495    pub async fn write(&self, slot_id: &str, data: &str) -> Result<()> {
496        let session = {
497            let sessions = self.sessions.read().await;
498            sessions
499                .get(slot_id)
500                .cloned()
501                .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
502        };
503
504        let session = session.read().await;
505        session.write(data).await
506    }
507
508    /// Send interrupt signal
509    pub async fn interrupt(&self, slot_id: &str) -> Result<()> {
510        let session = {
511            let sessions = self.sessions.read().await;
512            sessions
513                .get(slot_id)
514                .cloned()
515                .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
516        };
517
518        let session = session.read().await;
519        session.interrupt().await
520    }
521
522    /// Get screen content
523    pub async fn get_screen(&self, slot_id: &str) -> Result<String> {
524        let session = {
525            let sessions = self.sessions.read().await;
526            sessions
527                .get(slot_id)
528                .cloned()
529                .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
530        };
531
532        let session = session.read().await;
533        Ok(session.get_screen_text().await)
534    }
535
536    /// Get last N lines
537    pub async fn get_last_lines(&self, slot_id: &str, n: usize) -> Result<Vec<String>> {
538        let session = {
539            let sessions = self.sessions.read().await;
540            sessions
541                .get(slot_id)
542                .cloned()
543                .ok_or_else(|| anyhow!("No PTY session for slot: {}", slot_id))?
544        };
545
546        let session = session.read().await;
547        Ok(session.get_last_lines(n).await)
548    }
549
550    /// Get chat history
551    pub async fn get_history(&self, slot_id: &str) -> Vec<super::session::Message> {
552        let sessions = self.sessions.read().await;
553        if let Some(session) = sessions.get(slot_id) {
554            let session = session.read().await;
555            session.history().await
556        } else {
557            Vec::new()
558        }
559    }
560
561    /// Kill a session
562    pub async fn kill(&self, slot_id: &str) -> Result<()> {
563        // Remove from auto-restart
564        self.auto_restart_slots.write().await.remove(slot_id);
565
566        // Get and close session
567        let session = {
568            let mut sessions = self.sessions.write().await;
569            sessions.remove(slot_id)
570        };
571
572        if let Some(session) = session {
573            let mut session = session.write().await;
574            session.close().await?;
575        }
576
577        // Update agent info
578        {
579            let mut agent_info = self.agent_info.write().await;
580            if let Some(info) = agent_info.get_mut(slot_id) {
581                info.state = SessionState::Exited;
582                info.pid = None;
583            }
584        }
585
586        info!(slot_id = slot_id, "PTY session killed");
587        Ok(())
588    }
589
590    /// Restart a session
591    pub async fn restart(&self, slot: &Slot, options: PTYSpawnOptions) -> Result<PTYAgentInfo> {
592        self.kill(&slot.id).await?;
593        self.spawn(slot, options).await
594    }
595
596    /// Get session status
597    pub async fn get_status(&self, slot_id: &str) -> Option<PTYAgentInfo> {
598        self.agent_info.read().await.get(slot_id).cloned()
599    }
600
601    /// Get all session statuses
602    pub async fn get_all_status(&self) -> Vec<PTYAgentInfo> {
603        self.agent_info.read().await.values().cloned().collect()
604    }
605
606    /// Check if session is available (idle)
607    pub async fn is_available(&self, slot_id: &str) -> bool {
608        if let Some(info) = self.agent_info.read().await.get(slot_id) {
609            info.state == SessionState::Idle
610        } else {
611            false
612        }
613    }
614
615    /// Check if session is running
616    pub async fn is_running(&self, slot_id: &str) -> bool {
617        let sessions = self.sessions.read().await;
618        if let Some(session) = sessions.get(slot_id) {
619            let session = session.read().await;
620            session.is_running()
621        } else {
622            false
623        }
624    }
625
626    /// Get statistics
627    pub async fn get_stats(&self) -> ManagerStats {
628        let mut stats = ManagerStats::default();
629
630        let agent_info = self.agent_info.read().await;
631        stats.total = agent_info.len();
632
633        for info in agent_info.values() {
634            match info.state {
635                SessionState::Idle => {
636                    stats.idle += 1;
637                    stats.running += 1;
638                }
639                SessionState::Thinking
640                | SessionState::Responding
641                | SessionState::ToolRunning
642                | SessionState::Confirming => {
643                    stats.busy += 1;
644                    stats.running += 1;
645                }
646                SessionState::Starting => {
647                    stats.running += 1;
648                }
649                SessionState::Exited | SessionState::Error => {
650                    stats.stopped += 1;
651                }
652            }
653        }
654
655        stats
656    }
657
658    /// Subscribe to manager events
659    pub fn subscribe(&self) -> broadcast::Receiver<ManagerEvent> {
660        self.event_tx.subscribe()
661    }
662
663    /// Shutdown all sessions
664    pub async fn shutdown(&self) {
665        info!("Shutting down all PTY sessions...");
666
667        // Clear auto-restart
668        self.auto_restart_slots.write().await.clear();
669
670        // Collect slot IDs
671        let slot_ids: Vec<String> = {
672            let sessions = self.sessions.read().await;
673            sessions.keys().cloned().collect()
674        };
675
676        // Kill all sessions
677        for slot_id in slot_ids {
678            if let Err(e) = self.kill(&slot_id).await {
679                error!(slot_id = %slot_id, error = %e, "Error killing PTY session");
680            }
681        }
682
683        info!("All PTY sessions shut down");
684    }
685}
686
687/// Manager statistics
688#[derive(Debug, Clone, Default, Serialize)]
689pub struct ManagerStats {
690    pub total: usize,
691    pub running: usize,
692    pub idle: usize,
693    pub busy: usize,
694    pub stopped: usize,
695}