Skip to main content

missiond_core/pty/
session.rs

1//! PTY Session - Interactive terminal session for Claude Code
2//!
3//! Architecture: portable-pty (process) + alacritty_terminal (emulation) + semantic (detection)
4//!
5//! - portable-pty: Handles low-level PTY process communication
6//! - alacritty_terminal: Parses ANSI sequences, maintains virtual screen
7//! - semantic: State detection and confirmation dialog parsing
8
9use std::collections::HashMap;
10use std::io::{Read, Write as IoWrite};
11use std::path::PathBuf;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use alacritty_terminal::event::{Event as TermEvent, EventListener};
17use alacritty_terminal::grid::Dimensions;
18use alacritty_terminal::term::{Config as TermConfig, Term};
19use anyhow::{anyhow, Result};
20
21/// Terminal size for creating Term
22struct TermSize {
23    cols: usize,
24    rows: usize,
25}
26
27impl Dimensions for TermSize {
28    fn total_lines(&self) -> usize {
29        self.rows
30    }
31
32    fn screen_lines(&self) -> usize {
33        self.rows
34    }
35
36    fn columns(&self) -> usize {
37        self.cols
38    }
39}
40use chrono::Utc;
41use portable_pty::{native_pty_system, CommandBuilder, PtySize};
42use serde::{Deserialize, Serialize};
43use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock};
44use tokio::time::{interval, timeout};
45use tracing::{debug, error, info, warn};
46use uuid::Uuid;
47
48use super::extractor::{IncrementalExtractor, StableTextOp, TextAssembler};
49use crate::semantic::{
50    ClaudeCodeConfirmParser, ClaudeCodeStateParser, ClaudeCodeStatusParser,
51    ClaudeCodeToolOutputParser,
52    ConfirmParser, ParserContext, StateParser, StatusParser, ToolOutputParser,
53    default_registry,
54    ClaudeCodeStatus, ClaudeCodeToolOutput, ClaudeCodeTitle,
55    ConfirmInfo as SemanticConfirmInfo,
56    State as SemanticState,
57};
58
59// ========== Types ==========
60
61/// Session state machine
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum SessionState {
65    /// Starting up
66    Starting,
67    /// Waiting for input (shows >)
68    Idle,
69    /// Claude is thinking (shows spinner)
70    Thinking,
71    /// Claude is outputting response
72    Responding,
73    /// Tool is executing
74    ToolRunning,
75    /// Waiting for confirmation (Y/n)
76    Confirming,
77    /// Error state
78    Error,
79    /// Session has exited
80    Exited,
81}
82
83impl SessionState {
84    /// Check if this is a processing state (Claude is active)
85    pub fn is_processing(&self) -> bool {
86        matches!(
87            self,
88            SessionState::Thinking | SessionState::ToolRunning | SessionState::Responding
89        )
90    }
91}
92
93/// Chat message
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct Message {
96    pub role: MessageRole,
97    pub content: String,
98    pub timestamp: i64,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
102#[serde(rename_all = "lowercase")]
103pub enum MessageRole {
104    User,
105    Assistant,
106}
107
108/// Source of screen text
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
110#[serde(rename_all = "snake_case")]
111pub enum ScreenTextSource {
112    Assistant,
113    User,
114    Tool,
115    Ui,
116    Unknown,
117}
118
119/// Text output event (streaming or complete)
120#[derive(Debug, Clone, Serialize)]
121#[serde(tag = "type", rename_all = "snake_case")]
122pub enum TextOutputEvent {
123    Stream {
124        turn_id: u64,
125        seq: u64,
126        content: String,
127        timestamp: i64,
128    },
129    Complete {
130        turn_id: u64,
131        content: String,
132        timestamp: i64,
133    },
134}
135
136/// Screen text event for non-assistant content
137#[derive(Debug, Clone, Serialize)]
138pub struct ScreenTextEvent {
139    pub source: ScreenTextSource,
140    pub kind: String,
141    pub y: usize,
142    pub content: String,
143    pub timestamp: i64,
144    pub turn_id: Option<u64>,
145}
146
147/// Tool information from confirmation dialog
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct ToolInfo {
150    pub name: String,
151    pub mcp_server: Option<String>,
152    pub params: HashMap<String, serde_json::Value>,
153}
154
155/// Confirmation dialog information
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct ConfirmInfo {
158    #[serde(rename = "type")]
159    pub confirm_type: String,
160    pub tool: Option<ToolInfo>,
161    pub options: Vec<String>,
162    pub selected: usize,
163}
164
165/// Permission decision for tool execution
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum PermissionDecision {
168    /// Auto-approve the tool
169    Allow,
170    /// Auto-deny the tool
171    Deny,
172    /// Require manual confirmation
173    Confirm,
174}
175
176/// PTY session options
177#[derive(Debug, Clone)]
178pub struct PTYSessionOptions {
179    pub slot_id: String,
180    pub cwd: PathBuf,
181    pub env: Option<HashMap<String, String>>,
182    pub log_file: Option<PathBuf>,
183    pub cols: u16,
184    pub rows: u16,
185}
186
187impl Default for PTYSessionOptions {
188    fn default() -> Self {
189        Self {
190            slot_id: Uuid::new_v4().to_string(),
191            cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
192            env: None,
193            log_file: None,
194            cols: 120,
195            rows: 30,
196        }
197    }
198}
199
200/// Event listener for alacritty terminal
201struct SessionEventListener {
202    sender: mpsc::UnboundedSender<TermEvent>,
203}
204
205impl EventListener for SessionEventListener {
206    fn send_event(&self, event: TermEvent) {
207        let _ = self.sender.send(event);
208    }
209}
210
211// ========== PTYSession ==========
212
213/// Interactive PTY session for Claude Code
214///
215/// Manages a single Claude Code process with terminal emulation,
216/// state detection, and streaming text extraction.
217pub struct PTYSession {
218    /// Unique session ID
219    pub id: String,
220    /// Slot ID this session belongs to
221    pub slot_id: String,
222    /// Working directory
223    pub cwd: PathBuf,
224    /// Terminal dimensions
225    pub cols: u16,
226    pub rows: u16,
227
228    // Internal state
229    state: Arc<RwLock<SessionState>>,
230    history: Arc<RwLock<Vec<Message>>>,
231    terminal_title: Arc<RwLock<String>>,
232    pending_tool_confirm: Arc<RwLock<Option<ConfirmInfo>>>,
233    permission_check: Arc<RwLock<Option<Box<dyn Fn(&ConfirmInfo) -> PermissionDecision + Send + Sync>>>>,
234
235    // PTY process
236    pty_writer: Arc<Mutex<Option<Box<dyn IoWrite + Send>>>>,
237    pty_pid: Arc<RwLock<Option<u32>>>,
238    running: Arc<AtomicBool>,
239
240    // Terminal emulation
241    term: Arc<Mutex<Term<SessionEventListener>>>,
242
243    // Text extraction
244    extractor: Arc<Mutex<IncrementalExtractor>>,
245    text_assembler: Arc<Mutex<TextAssembler>>,
246    current_turn_id: Arc<RwLock<Option<u64>>>,
247    stream_seq: Arc<RwLock<u64>>,
248    turn_counter: Arc<RwLock<u64>>,
249    line_source_by_y: Arc<RwLock<HashMap<usize, ScreenTextSource>>>,
250    assistant_block_active: Arc<AtomicBool>,
251
252    // Event channels
253    event_tx: broadcast::Sender<SessionEvent>,
254    state_change_tx: broadcast::Sender<(SessionState, SessionState)>,
255    shutdown_tx: Option<oneshot::Sender<()>>,
256
257    // Logging
258    log_file: Option<PathBuf>,
259}
260
261/// Events emitted by the session
262#[derive(Debug, Clone)]
263pub enum SessionEvent {
264    /// Raw data from PTY
265    Data(Vec<u8>),
266    /// State changed
267    StateChange {
268        new_state: SessionState,
269        prev_state: SessionState,
270    },
271    /// Text output (stream or complete)
272    TextOutput(TextOutputEvent),
273    /// Screen text (non-assistant)
274    ScreenText(ScreenTextEvent),
275    /// Confirmation required
276    ConfirmRequired {
277        prompt: String,
278        info: Option<ConfirmInfo>,
279    },
280    /// Status bar update (spinner + status text)
281    StatusUpdate(ClaudeCodeStatus),
282    /// Tool output parsed
283    ToolOutput(ClaudeCodeToolOutput),
284    /// Terminal title changed
285    TitleChange(ClaudeCodeTitle),
286    /// Session exited
287    Exit(i32),
288}
289
290impl PTYSession {
291    /// Create a new PTY session
292    pub fn new(options: PTYSessionOptions) -> Result<Self> {
293        let id = format!(
294            "pty-{}-{}",
295            Utc::now().timestamp_millis(),
296            &Uuid::new_v4().to_string()[..8]
297        );
298
299        // Create terminal event channel
300        let (term_event_tx, _term_event_rx) = mpsc::unbounded_channel();
301        let event_listener = SessionEventListener {
302            sender: term_event_tx,
303        };
304
305        // Create virtual terminal
306        let term_config = TermConfig::default();
307        let term_size = TermSize {
308            cols: options.cols as usize,
309            rows: options.rows as usize,
310        };
311        let term = Term::new(term_config, &term_size, event_listener);
312
313        // Create event channels
314        let (event_tx, _) = broadcast::channel(1000);
315        let (state_change_tx, _) = broadcast::channel(100);
316
317        Ok(Self {
318            id,
319            slot_id: options.slot_id,
320            cwd: options.cwd,
321            cols: options.cols,
322            rows: options.rows,
323
324            state: Arc::new(RwLock::new(SessionState::Starting)),
325            history: Arc::new(RwLock::new(Vec::new())),
326            terminal_title: Arc::new(RwLock::new(String::new())),
327            pending_tool_confirm: Arc::new(RwLock::new(None)),
328            permission_check: Arc::new(RwLock::new(None)),
329
330            pty_writer: Arc::new(Mutex::new(None)),
331            pty_pid: Arc::new(RwLock::new(None)),
332            running: Arc::new(AtomicBool::new(false)),
333
334            term: Arc::new(Mutex::new(term)),
335            extractor: Arc::new(Mutex::new(IncrementalExtractor::new(
336                options.rows as usize,
337                None,
338            ))),
339            text_assembler: Arc::new(Mutex::new(TextAssembler::new())),
340            current_turn_id: Arc::new(RwLock::new(None)),
341            stream_seq: Arc::new(RwLock::new(0)),
342            turn_counter: Arc::new(RwLock::new(0)),
343            line_source_by_y: Arc::new(RwLock::new(HashMap::new())),
344            assistant_block_active: Arc::new(AtomicBool::new(false)),
345
346            event_tx,
347            state_change_tx,
348            shutdown_tx: None,
349            log_file: options.log_file,
350        })
351    }
352
353    // ========== Getters ==========
354
355    /// Get current state
356    pub async fn state(&self) -> SessionState {
357        *self.state.read().await
358    }
359
360    /// Get chat history
361    pub async fn history(&self) -> Vec<Message> {
362        self.history.read().await.clone()
363    }
364
365    /// Check if session is running
366    pub fn is_running(&self) -> bool {
367        self.running.load(Ordering::SeqCst)
368    }
369
370    /// Get process ID
371    pub async fn pid(&self) -> Option<u32> {
372        *self.pty_pid.read().await
373    }
374
375    /// Get pending tool confirmation
376    pub async fn pending_tool_confirm(&self) -> Option<ConfirmInfo> {
377        self.pending_tool_confirm.read().await.clone()
378    }
379
380    /// Get terminal title
381    pub async fn terminal_title(&self) -> String {
382        self.terminal_title.read().await.clone()
383    }
384
385    // ========== Screen Reading ==========
386
387    /// Get current screen text
388    pub async fn get_screen_text(&self) -> String {
389        let term = self.term.lock().await;
390        let grid = term.grid();
391        let mut lines = Vec::new();
392
393        let total_lines = grid.total_lines();
394        let rows = grid.screen_lines();
395        let start = if total_lines > rows {
396            total_lines - rows
397        } else {
398            0
399        };
400
401        for y in start..total_lines {
402            let line = alacritty_terminal::index::Line(y as i32);
403            if y < grid.total_lines() {
404                let row = &grid[line];
405                let text: String = row.into_iter().map(|cell| cell.c).collect();
406                lines.push(text.trim_end().to_string());
407            }
408        }
409
410        lines.join("\n")
411    }
412
413    /// Get last N lines
414    pub async fn get_last_lines(&self, n: usize) -> Vec<String> {
415        let term = self.term.lock().await;
416        let grid = term.grid();
417        let mut lines = Vec::new();
418
419        let total_lines = grid.total_lines();
420        let start = if total_lines > n { total_lines - n } else { 0 };
421
422        for y in start..total_lines {
423            let line = alacritty_terminal::index::Line(y as i32);
424            if y < grid.total_lines() {
425                let row = &grid[line];
426                let text: String = row.into_iter().map(|cell| cell.c).collect();
427                lines.push(text.trim_end().to_string());
428            }
429        }
430
431        lines
432    }
433
434    // ========== Lifecycle ==========
435
436    /// Start the PTY session
437    pub async fn start(&mut self) -> Result<()> {
438        if self.running.load(Ordering::SeqCst) {
439            return Err(anyhow!("Session already started"));
440        }
441
442        info!(slot_id = %self.slot_id, cwd = %self.cwd.display(), "Starting PTY session");
443
444        // Create PTY
445        let pty_system = native_pty_system();
446        let pty_pair = pty_system.openpty(PtySize {
447            rows: self.rows,
448            cols: self.cols,
449            pixel_width: 0,
450            pixel_height: 0,
451        })?;
452
453        // Build command: claude --add-dir "cwd"
454        let mut cmd = CommandBuilder::new("zsh");
455        cmd.args([
456            "-l",
457            "-c",
458            &format!("claude --add-dir \"{}\"", self.cwd.display()),
459        ]);
460        cmd.cwd(&self.cwd);
461
462        // Set environment
463        cmd.env("TERM", "xterm-256color");
464        if let Ok(path) = std::env::var("PATH") {
465            cmd.env("PATH", path);
466        }
467        if let Ok(home) = std::env::var("HOME") {
468            cmd.env("HOME", home);
469        }
470
471        // Spawn child process
472        let mut child = pty_pair.slave.spawn_command(cmd)?;
473        let pid = child.process_id().unwrap_or(0);
474        *self.pty_pid.write().await = Some(pid);
475        info!(pid = pid, "PTY spawned");
476
477        // Get writer
478        let writer = pty_pair.master.take_writer()?;
479        *self.pty_writer.lock().await = Some(writer);
480
481        // Get reader
482        let reader = pty_pair.master.try_clone_reader()?;
483
484        self.running.store(true, Ordering::SeqCst);
485
486        // Create shutdown channel
487        let (shutdown_tx, shutdown_rx) = oneshot::channel();
488        self.shutdown_tx = Some(shutdown_tx);
489
490        // Spawn read task
491        let term = Arc::clone(&self.term);
492        let event_tx = self.event_tx.clone();
493        let running = Arc::clone(&self.running);
494
495        tokio::spawn(async move {
496            Self::read_loop(reader, term, event_tx, running, shutdown_rx).await;
497        });
498
499        // Spawn state check task
500        let session_state = Arc::clone(&self.state);
501        let term_for_check = Arc::clone(&self.term);
502        let extractor = Arc::clone(&self.extractor);
503        let text_assembler = Arc::clone(&self.text_assembler);
504        let current_turn = Arc::clone(&self.current_turn_id);
505        let stream_seq = Arc::clone(&self.stream_seq);
506        let turn_counter = Arc::clone(&self.turn_counter);
507        let line_source = Arc::clone(&self.line_source_by_y);
508        let assistant_active = Arc::clone(&self.assistant_block_active);
509        let state_change_tx = self.state_change_tx.clone();
510        let event_tx_for_check = self.event_tx.clone();
511        let running_for_check = Arc::clone(&self.running);
512        let pending_confirm = Arc::clone(&self.pending_tool_confirm);
513        let permission_check = Arc::clone(&self.permission_check);
514        let pty_writer = Arc::clone(&self.pty_writer);
515
516        tokio::spawn(async move {
517            Self::state_check_loop(
518                session_state,
519                term_for_check,
520                extractor,
521                text_assembler,
522                current_turn,
523                stream_seq,
524                turn_counter,
525                line_source,
526                assistant_active,
527                state_change_tx,
528                event_tx_for_check,
529                running_for_check,
530                pending_confirm,
531                permission_check,
532                pty_writer,
533            )
534            .await;
535        });
536
537        // Wait for child exit in background
538        let event_tx_for_exit = self.event_tx.clone();
539        let running_for_exit = Arc::clone(&self.running);
540        let state_for_exit = Arc::clone(&self.state);
541
542        tokio::spawn(async move {
543            // Wait for child to exit (blocking in thread pool)
544            let exit_status = tokio::task::spawn_blocking(move || child.wait())
545                .await
546                .ok()
547                .and_then(|r| r.ok());
548
549            let exit_code = exit_status
550                .map(|s| s.exit_code() as i32)
551                .unwrap_or(-1);
552
553            running_for_exit.store(false, Ordering::SeqCst);
554            *state_for_exit.write().await = SessionState::Exited;
555
556            let _ = event_tx_for_exit.send(SessionEvent::Exit(exit_code));
557            info!(exit_code = exit_code, "PTY exited");
558        });
559
560        // Wait for idle state (Claude ready)
561        self.wait_for_state(SessionState::Idle, Duration::from_secs(60))
562            .await?;
563
564        Ok(())
565    }
566
567    /// Read loop - reads from PTY and feeds to terminal
568    async fn read_loop(
569        reader: Box<dyn Read + Send>,
570        _term: Arc<Mutex<Term<SessionEventListener>>>,
571        event_tx: broadcast::Sender<SessionEvent>,
572        running: Arc<AtomicBool>,
573        _shutdown_rx: oneshot::Receiver<()>,
574    ) {
575        // Move reader into a thread that will do blocking reads
576        let running_clone = Arc::clone(&running);
577
578        tokio::task::spawn_blocking(move || {
579            let mut reader = reader;
580            let mut buf = [0u8; 4096];
581
582            while running_clone.load(Ordering::SeqCst) {
583                match reader.read(&mut buf) {
584                    Ok(0) => break, // EOF
585                    Ok(n) => {
586                        let data = buf[..n].to_vec();
587                        // Note: We can't easily feed to terminal from here since Term is not Send
588                        // The terminal feeding should happen in the main async context
589                        // For now, just emit the data event
590                        let _ = event_tx.send(SessionEvent::Data(data));
591                    }
592                    Err(e) => {
593                        error!(error = %e, "PTY read error");
594                        break;
595                    }
596                }
597            }
598        });
599    }
600
601    /// State check loop - periodically checks terminal state
602    #[allow(clippy::too_many_arguments)]
603    async fn state_check_loop(
604        state: Arc<RwLock<SessionState>>,
605        term: Arc<Mutex<Term<SessionEventListener>>>,
606        extractor: Arc<Mutex<IncrementalExtractor>>,
607        text_assembler: Arc<Mutex<TextAssembler>>,
608        current_turn_id: Arc<RwLock<Option<u64>>>,
609        stream_seq: Arc<RwLock<u64>>,
610        turn_counter: Arc<RwLock<u64>>,
611        line_source_by_y: Arc<RwLock<HashMap<usize, ScreenTextSource>>>,
612        assistant_block_active: Arc<AtomicBool>,
613        state_change_tx: broadcast::Sender<(SessionState, SessionState)>,
614        event_tx: broadcast::Sender<SessionEvent>,
615        running: Arc<AtomicBool>,
616        pending_tool_confirm: Arc<RwLock<Option<ConfirmInfo>>>,
617        permission_check: Arc<RwLock<Option<Box<dyn Fn(&ConfirmInfo) -> PermissionDecision + Send + Sync>>>>,
618        pty_writer: Arc<Mutex<Option<Box<dyn IoWrite + Send>>>>,
619    ) {
620        let mut check_interval = interval(Duration::from_millis(100));
621
622        // Create parsers (stateless, can be reused)
623        let state_parser = ClaudeCodeStateParser::new();
624        let confirm_parser = ClaudeCodeConfirmParser::new();
625        let status_parser = ClaudeCodeStatusParser::new();
626        let tool_parser = ClaudeCodeToolOutputParser::new();
627        let fingerprint_registry = default_registry();
628
629        while running.load(Ordering::SeqCst) {
630            check_interval.tick().await;
631
632            // Extract frame delta
633            let delta = {
634                let term_guard = term.lock().await;
635                let mut extractor_guard = extractor.lock().await;
636                extractor_guard.extract(&*term_guard)
637            };
638
639            // Get screen text for state detection
640            let last_lines = {
641                let term_guard = term.lock().await;
642                let grid = term_guard.grid();
643                let mut lines = Vec::new();
644                let total = grid.total_lines();
645                let start = if total > 20 { total - 20 } else { 0 };
646                for y in start..total {
647                    let line = alacritty_terminal::index::Line(y as i32);
648                    if y < total {
649                        let row = &grid[line];
650                        let text: String = row.into_iter().map(|cell| cell.c).collect();
651                        lines.push(text.trim_end().to_string());
652                    }
653                }
654                lines
655            };
656
657            // Create ParserContext with current state
658            let current_state = *state.read().await;
659            let context = ParserContext::new(last_lines.clone())
660                .with_state(current_state_to_semantic(current_state));
661
662            // Use FingerprintRegistry for quick hints
663            let hints = fingerprint_registry.extract(&context).hints;
664
665            // Detect state using semantic StateParser
666            let detected_result = state_parser.detect_state(&context);
667            let detected_state = detected_result.as_ref().map(|r| semantic_state_to_session_state(r.state));
668
669            // Handle state transitions
670            if let Some(new_state) = detected_state {
671                // Check for trust confirmation during startup (auto-confirm)
672                if let Some(ref result) = detected_result {
673                    if let Some(ref meta) = result.meta {
674                        if meta.needs_trust_confirm == Some(true) {
675                            debug!("Auto-confirming trust dialog");
676                            if let Some(writer) = pty_writer.lock().await.as_mut() {
677                                let _ = writer.write_all(b"\r");
678                            }
679                            continue;
680                        }
681                    }
682                }
683
684                if new_state != current_state {
685                    // Begin turn when entering processing state
686                    if new_state.is_processing() && !current_state.is_processing() {
687                        let mut counter = turn_counter.write().await;
688                        *counter += 1;
689                        *current_turn_id.write().await = Some(*counter);
690                        *stream_seq.write().await = 0;
691                        text_assembler.lock().await.reset();
692                        line_source_by_y.write().await.clear();
693                        assistant_block_active.store(false, Ordering::SeqCst);
694                        debug!(turn_id = *counter, "Begin turn");
695                    }
696
697                    *state.write().await = new_state;
698
699                    // End turn when leaving processing state
700                    if current_state.is_processing() && !new_state.is_processing() {
701                        if let Some(turn_id) = *current_turn_id.read().await {
702                            let content = text_assembler.lock().await.finalize();
703                            let _ = event_tx.send(SessionEvent::TextOutput(
704                                TextOutputEvent::Complete {
705                                    turn_id,
706                                    content,
707                                    timestamp: Utc::now().timestamp_millis(),
708                                },
709                            ));
710                            debug!(turn_id = turn_id, "End turn");
711                        }
712                        *current_turn_id.write().await = None;
713                        *stream_seq.write().await = 0;
714                        text_assembler.lock().await.reset();
715                        line_source_by_y.write().await.clear();
716                        assistant_block_active.store(false, Ordering::SeqCst);
717                    }
718
719                    // Handle confirming state using semantic ConfirmParser
720                    if new_state == SessionState::Confirming {
721                        let semantic_confirm = confirm_parser.detect_confirm(&context);
722                        let confirm_info = semantic_confirm.as_ref().map(convert_semantic_confirm_info);
723                        *pending_tool_confirm.write().await = confirm_info.clone();
724
725                        // Check permission if callback is set
726                        if let Some(ref info) = confirm_info {
727                            let permission = permission_check.read().await;
728                            if let Some(ref check_fn) = *permission {
729                                let decision = check_fn(info);
730                                match decision {
731                                    PermissionDecision::Allow => {
732                                        // Auto-approve
733                                        if let Some(writer) =
734                                            pty_writer.lock().await.as_mut()
735                                        {
736                                            let _ = writer.write_all(b"\r");
737                                        }
738                                        continue;
739                                    }
740                                    PermissionDecision::Deny => {
741                                        // Auto-deny (down, down, enter)
742                                        if let Some(writer) =
743                                            pty_writer.lock().await.as_mut()
744                                        {
745                                            let _ =
746                                                writer.write_all(b"\x1b[B\x1b[B\r");
747                                        }
748                                        continue;
749                                    }
750                                    PermissionDecision::Confirm => {
751                                        // Require manual confirmation
752                                    }
753                                }
754                            }
755                        }
756
757                        let _ = event_tx.send(SessionEvent::ConfirmRequired {
758                            prompt: last_lines.join("\n"),
759                            info: confirm_info,
760                        });
761                    }
762
763                    let _ = state_change_tx.send((new_state, current_state));
764                    let _ = event_tx.send(SessionEvent::StateChange {
765                        new_state,
766                        prev_state: current_state,
767                    });
768                }
769            }
770
771            // Emit StatusUpdate event if spinner is detected
772            if hints.has_spinner {
773                if let Some(status) = status_parser.parse(&context) {
774                    let _ = event_tx.send(SessionEvent::StatusUpdate(status));
775                }
776            }
777
778            // Emit ToolOutput event if tool output is detected
779            if hints.has_tool_output {
780                if let Some(result) = tool_parser.parse(&context) {
781                    let _ = event_tx.send(SessionEvent::ToolOutput(result.data));
782                }
783            }
784
785            // Process stable ops for text streaming
786            if !delta.stable_ops.is_empty() && current_state.is_processing() {
787                let turn_id = *current_turn_id.read().await;
788                if let Some(turn_id) = turn_id {
789                    for op in &delta.stable_ops {
790                        let source = classify_stable_op(op);
791                        if matches!(source, ScreenTextSource::Assistant) {
792                            let chunk = text_assembler.lock().await.apply(op);
793                            if !chunk.is_empty() {
794                                let seq = {
795                                    let mut seq_guard = stream_seq.write().await;
796                                    let s = *seq_guard;
797                                    *seq_guard += 1;
798                                    s
799                                };
800                                let _ =
801                                    event_tx.send(SessionEvent::TextOutput(TextOutputEvent::Stream {
802                                        turn_id,
803                                        seq,
804                                        content: chunk,
805                                        timestamp: Utc::now().timestamp_millis(),
806                                    }));
807                            }
808                        }
809                    }
810                }
811            }
812        }
813    }
814
815    /// Write data to PTY
816    pub async fn write(&self, data: &str) -> Result<()> {
817        if !self.running.load(Ordering::SeqCst) {
818            return Err(anyhow!("Session not running"));
819        }
820
821        let mut writer_guard = self.pty_writer.lock().await;
822        if let Some(ref mut writer) = *writer_guard {
823            writer.write_all(data.as_bytes())?;
824            writer.flush()?;
825            debug!(data_len = data.len(), "Wrote to PTY");
826            Ok(())
827        } else {
828            Err(anyhow!("PTY writer not available"))
829        }
830    }
831
832    /// Send message and wait for response
833    pub async fn send(&self, message: &str, timeout_ms: u64) -> Result<String> {
834        let state = self.state().await;
835        if state != SessionState::Idle {
836            return Err(anyhow!("Cannot send message in state: {:?}", state));
837        }
838
839        // Record user message
840        {
841            let mut history = self.history.write().await;
842            history.push(Message {
843                role: MessageRole::User,
844                content: message.trim().to_string(),
845                timestamp: Utc::now().timestamp_millis(),
846            });
847        }
848
849        // Subscribe to events before sending
850        let mut rx = self.event_tx.subscribe();
851
852        // Send message
853        self.write(message).await?;
854        tokio::time::sleep(Duration::from_millis(100)).await;
855        self.write("\r").await?;
856
857        // Wait for state to leave idle
858        self.wait_for_state_change(SessionState::Idle, Duration::from_secs(30))
859            .await?;
860
861        // Wait for response complete
862        let timeout_duration = Duration::from_millis(timeout_ms);
863        let result = timeout(timeout_duration, async {
864            loop {
865                if let Ok(event) = rx.recv().await {
866                    if let SessionEvent::TextOutput(TextOutputEvent::Complete { content, .. }) =
867                        event
868                    {
869                        return Ok(content);
870                    }
871                    if let SessionEvent::Exit(code) = event {
872                        return Err(anyhow!("Session exited with code: {}", code));
873                    }
874                }
875            }
876        })
877        .await;
878
879        match result {
880            Ok(Ok(response)) => {
881                // Record assistant message
882                {
883                    let mut history = self.history.write().await;
884                    history.push(Message {
885                        role: MessageRole::Assistant,
886                        content: response.clone(),
887                        timestamp: Utc::now().timestamp_millis(),
888                    });
889                }
890                Ok(response)
891            }
892            Ok(Err(e)) => Err(e),
893            Err(_) => Err(anyhow!("Timeout waiting for response")),
894        }
895    }
896
897    /// Send confirmation response
898    pub async fn confirm(&self, response: ConfirmResponse) -> Result<()> {
899        let state = self.state().await;
900        if state != SessionState::Confirming {
901            warn!(state = ?state, "Not in confirming state");
902            return Ok(());
903        }
904
905        let input = match response {
906            ConfirmResponse::Yes => "\r",
907            ConfirmResponse::No => "\x1b[B\x1b[B\r", // down, down, enter
908            ConfirmResponse::Option(n) => {
909                let downs = "\x1b[B".repeat(n.saturating_sub(1));
910                let input = format!("{}\r", downs);
911                self.write(&input).await?;
912                return Ok(());
913            }
914        };
915
916        self.write(input).await
917    }
918
919    /// Send interrupt (Ctrl+C)
920    pub async fn interrupt(&self) -> Result<()> {
921        self.write("\x03").await
922    }
923
924    /// Set permission check callback
925    pub async fn set_permission_check<F>(&self, callback: F)
926    where
927        F: Fn(&ConfirmInfo) -> PermissionDecision + Send + Sync + 'static,
928    {
929        *self.permission_check.write().await = Some(Box::new(callback));
930    }
931
932    /// Subscribe to session events
933    pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
934        self.event_tx.subscribe()
935    }
936
937    /// Subscribe to state changes
938    pub fn subscribe_state_changes(&self) -> broadcast::Receiver<(SessionState, SessionState)> {
939        self.state_change_tx.subscribe()
940    }
941
942    /// Wait for specific state
943    pub async fn wait_for_state(&self, target: SessionState, timeout_duration: Duration) -> Result<()> {
944        let current = self.state().await;
945        if current == target {
946            return Ok(());
947        }
948
949        let mut rx = self.state_change_tx.subscribe();
950
951        timeout(timeout_duration, async {
952            loop {
953                if let Ok((new_state, _)) = rx.recv().await {
954                    if new_state == target {
955                        return Ok(());
956                    }
957                    if matches!(new_state, SessionState::Error | SessionState::Exited) {
958                        return Err(anyhow!(
959                            "Session entered {:?} while waiting for {:?}",
960                            new_state,
961                            target
962                        ));
963                    }
964                }
965            }
966        })
967        .await
968        .map_err(|_| anyhow!("Timeout waiting for state: {:?}", target))?
969    }
970
971    /// Wait for state to change from current
972    async fn wait_for_state_change(
973        &self,
974        current: SessionState,
975        timeout_duration: Duration,
976    ) -> Result<()> {
977        let state = self.state().await;
978        if state != current {
979            return Ok(());
980        }
981
982        let mut rx = self.state_change_tx.subscribe();
983
984        timeout(timeout_duration, async {
985            loop {
986                if let Ok((new_state, _)) = rx.recv().await {
987                    if new_state != current {
988                        return Ok(());
989                    }
990                }
991            }
992        })
993        .await
994        .map_err(|_| anyhow!("Timeout waiting to leave state: {:?}", current))?
995    }
996
997    /// Close session gracefully
998    pub async fn close(&mut self) -> Result<()> {
999        if !self.running.load(Ordering::SeqCst) {
1000            return Ok(());
1001        }
1002
1003        info!("Closing PTY session");
1004
1005        // Try graceful exit
1006        let _ = self.write("/exit\r").await;
1007
1008        // Wait for exit or timeout
1009        let timeout_result = timeout(Duration::from_secs(3), async {
1010            let mut rx = self.event_tx.subscribe();
1011            loop {
1012                if let Ok(SessionEvent::Exit(_)) = rx.recv().await {
1013                    break;
1014                }
1015            }
1016        })
1017        .await;
1018
1019        if timeout_result.is_err() {
1020            // Force kill
1021            self.kill().await;
1022        }
1023
1024        Ok(())
1025    }
1026
1027    /// Force kill session
1028    pub async fn kill(&mut self) {
1029        self.running.store(false, Ordering::SeqCst);
1030        if let Some(tx) = self.shutdown_tx.take() {
1031            let _ = tx.send(());
1032        }
1033        *self.pty_writer.lock().await = None;
1034        info!("PTY session killed");
1035    }
1036}
1037
1038/// Confirmation response types
1039pub enum ConfirmResponse {
1040    Yes,
1041    No,
1042    Option(usize),
1043}
1044
1045// ========== Helper Functions ==========
1046
1047/// Convert semantic State to SessionState
1048fn semantic_state_to_session_state(state: SemanticState) -> SessionState {
1049    match state {
1050        SemanticState::Starting => SessionState::Starting,
1051        SemanticState::Idle => SessionState::Idle,
1052        SemanticState::Thinking => SessionState::Thinking,
1053        SemanticState::ToolRunning => SessionState::ToolRunning,
1054        SemanticState::Confirming => SessionState::Confirming,
1055        SemanticState::Error => SessionState::Error,
1056    }
1057}
1058
1059/// Convert SessionState to semantic State
1060fn current_state_to_semantic(state: SessionState) -> SemanticState {
1061    match state {
1062        SessionState::Starting => SemanticState::Starting,
1063        SessionState::Idle => SemanticState::Idle,
1064        SessionState::Thinking => SemanticState::Thinking,
1065        SessionState::Responding => SemanticState::Thinking, // No direct mapping
1066        SessionState::ToolRunning => SemanticState::ToolRunning,
1067        SessionState::Confirming => SemanticState::Confirming,
1068        SessionState::Error => SemanticState::Error,
1069        SessionState::Exited => SemanticState::Idle, // No direct mapping
1070    }
1071}
1072
1073/// Convert semantic ConfirmInfo to session ConfirmInfo
1074fn convert_semantic_confirm_info(info: &SemanticConfirmInfo) -> ConfirmInfo {
1075    let options: Vec<String> = info
1076        .options
1077        .as_ref()
1078        .map(|opts| opts.iter().map(|o| o.label.clone()).collect())
1079        .unwrap_or_default();
1080
1081    let tool = info.tool.as_ref().map(|t| ToolInfo {
1082        name: t.name.clone(),
1083        mcp_server: t.mcp_server.clone(),
1084        params: t
1085            .params
1086            .iter()
1087            .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
1088            .collect(),
1089    });
1090
1091    ConfirmInfo {
1092        confirm_type: format!("{:?}", info.confirm_type).to_lowercase(),
1093        tool,
1094        options,
1095        selected: 0, // Default to first option selected
1096    }
1097}
1098
1099/// Classify stable op source
1100fn classify_stable_op(op: &StableTextOp) -> ScreenTextSource {
1101    let text = op.text();
1102    let trimmed = text.trim_start();
1103
1104    // Prompt line = user input
1105    if trimmed.starts_with('>') || trimmed.starts_with('❯') {
1106        return ScreenTextSource::User;
1107    }
1108
1109    // Tool output markers
1110    if trimmed.starts_with('⎿') || trimmed.starts_with('│') {
1111        return ScreenTextSource::Tool;
1112    }
1113
1114    // Tool call header
1115    if trimmed.starts_with('⏺') {
1116        // Check if it's a tool call (has parameters) or assistant text
1117        if trimmed.contains('(') && !trimmed.contains("completed") {
1118            return ScreenTextSource::Tool;
1119        }
1120        return ScreenTextSource::Assistant;
1121    }
1122
1123    // UI elements
1124    if trimmed.contains("ctrl+")
1125        || trimmed.contains("Ctrl+")
1126        || trimmed.contains("IDE disconnected")
1127    {
1128        return ScreenTextSource::Ui;
1129    }
1130
1131    // Box drawing = UI
1132    if trimmed
1133        .chars()
1134        .any(|c| matches!(c, '╭' | '╮' | '╯' | '╰' | '┌' | '┐' | '└' | '┘' | '─' | '━' | '═'))
1135    {
1136        return ScreenTextSource::Ui;
1137    }
1138
1139    ScreenTextSource::Unknown
1140}