1use std::collections::HashMap;
10use std::io::{Read, Write as IoWrite};
11use std::path::PathBuf;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use alacritty_terminal::event::{Event as TermEvent, EventListener};
17use alacritty_terminal::grid::Dimensions;
18use alacritty_terminal::term::{Config as TermConfig, Term};
19use anyhow::{anyhow, Result};
20
21struct TermSize {
23 cols: usize,
24 rows: usize,
25}
26
27impl Dimensions for TermSize {
28 fn total_lines(&self) -> usize {
29 self.rows
30 }
31
32 fn screen_lines(&self) -> usize {
33 self.rows
34 }
35
36 fn columns(&self) -> usize {
37 self.cols
38 }
39}
40use chrono::Utc;
41use portable_pty::{native_pty_system, CommandBuilder, PtySize};
42use serde::{Deserialize, Serialize};
43use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock};
44use tokio::time::{interval, timeout};
45use tracing::{debug, error, info, warn};
46use uuid::Uuid;
47
48use super::extractor::{IncrementalExtractor, StableTextOp, TextAssembler};
49use crate::semantic::{
50 ClaudeCodeConfirmParser, ClaudeCodeStateParser, ClaudeCodeStatusParser,
51 ClaudeCodeToolOutputParser,
52 ConfirmParser, ParserContext, StateParser, StatusParser, ToolOutputParser,
53 default_registry,
54 ClaudeCodeStatus, ClaudeCodeToolOutput, ClaudeCodeTitle,
55 ConfirmInfo as SemanticConfirmInfo,
56 State as SemanticState,
57};
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum SessionState {
65 Starting,
67 Idle,
69 Thinking,
71 Responding,
73 ToolRunning,
75 Confirming,
77 Error,
79 Exited,
81}
82
83impl SessionState {
84 pub fn is_processing(&self) -> bool {
86 matches!(
87 self,
88 SessionState::Thinking | SessionState::ToolRunning | SessionState::Responding
89 )
90 }
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct Message {
96 pub role: MessageRole,
97 pub content: String,
98 pub timestamp: i64,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
102#[serde(rename_all = "lowercase")]
103pub enum MessageRole {
104 User,
105 Assistant,
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
110#[serde(rename_all = "snake_case")]
111pub enum ScreenTextSource {
112 Assistant,
113 User,
114 Tool,
115 Ui,
116 Unknown,
117}
118
119#[derive(Debug, Clone, Serialize)]
121#[serde(tag = "type", rename_all = "snake_case")]
122pub enum TextOutputEvent {
123 Stream {
124 turn_id: u64,
125 seq: u64,
126 content: String,
127 timestamp: i64,
128 },
129 Complete {
130 turn_id: u64,
131 content: String,
132 timestamp: i64,
133 },
134}
135
136#[derive(Debug, Clone, Serialize)]
138pub struct ScreenTextEvent {
139 pub source: ScreenTextSource,
140 pub kind: String,
141 pub y: usize,
142 pub content: String,
143 pub timestamp: i64,
144 pub turn_id: Option<u64>,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct ToolInfo {
150 pub name: String,
151 pub mcp_server: Option<String>,
152 pub params: HashMap<String, serde_json::Value>,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct ConfirmInfo {
158 #[serde(rename = "type")]
159 pub confirm_type: String,
160 pub tool: Option<ToolInfo>,
161 pub options: Vec<String>,
162 pub selected: usize,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum PermissionDecision {
168 Allow,
170 Deny,
172 Confirm,
174}
175
176#[derive(Debug, Clone)]
178pub struct PTYSessionOptions {
179 pub slot_id: String,
180 pub cwd: PathBuf,
181 pub env: Option<HashMap<String, String>>,
182 pub log_file: Option<PathBuf>,
183 pub cols: u16,
184 pub rows: u16,
185}
186
187impl Default for PTYSessionOptions {
188 fn default() -> Self {
189 Self {
190 slot_id: Uuid::new_v4().to_string(),
191 cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
192 env: None,
193 log_file: None,
194 cols: 120,
195 rows: 30,
196 }
197 }
198}
199
200struct SessionEventListener {
202 sender: mpsc::UnboundedSender<TermEvent>,
203}
204
205impl EventListener for SessionEventListener {
206 fn send_event(&self, event: TermEvent) {
207 let _ = self.sender.send(event);
208 }
209}
210
211pub struct PTYSession {
218 pub id: String,
220 pub slot_id: String,
222 pub cwd: PathBuf,
224 pub cols: u16,
226 pub rows: u16,
227
228 state: Arc<RwLock<SessionState>>,
230 history: Arc<RwLock<Vec<Message>>>,
231 terminal_title: Arc<RwLock<String>>,
232 pending_tool_confirm: Arc<RwLock<Option<ConfirmInfo>>>,
233 permission_check: Arc<RwLock<Option<Box<dyn Fn(&ConfirmInfo) -> PermissionDecision + Send + Sync>>>>,
234
235 pty_writer: Arc<Mutex<Option<Box<dyn IoWrite + Send>>>>,
237 pty_pid: Arc<RwLock<Option<u32>>>,
238 running: Arc<AtomicBool>,
239
240 term: Arc<Mutex<Term<SessionEventListener>>>,
242
243 extractor: Arc<Mutex<IncrementalExtractor>>,
245 text_assembler: Arc<Mutex<TextAssembler>>,
246 current_turn_id: Arc<RwLock<Option<u64>>>,
247 stream_seq: Arc<RwLock<u64>>,
248 turn_counter: Arc<RwLock<u64>>,
249 line_source_by_y: Arc<RwLock<HashMap<usize, ScreenTextSource>>>,
250 assistant_block_active: Arc<AtomicBool>,
251
252 event_tx: broadcast::Sender<SessionEvent>,
254 state_change_tx: broadcast::Sender<(SessionState, SessionState)>,
255 shutdown_tx: Option<oneshot::Sender<()>>,
256
257 log_file: Option<PathBuf>,
259}
260
261#[derive(Debug, Clone)]
263pub enum SessionEvent {
264 Data(Vec<u8>),
266 StateChange {
268 new_state: SessionState,
269 prev_state: SessionState,
270 },
271 TextOutput(TextOutputEvent),
273 ScreenText(ScreenTextEvent),
275 ConfirmRequired {
277 prompt: String,
278 info: Option<ConfirmInfo>,
279 },
280 StatusUpdate(ClaudeCodeStatus),
282 ToolOutput(ClaudeCodeToolOutput),
284 TitleChange(ClaudeCodeTitle),
286 Exit(i32),
288}
289
290impl PTYSession {
291 pub fn new(options: PTYSessionOptions) -> Result<Self> {
293 let id = format!(
294 "pty-{}-{}",
295 Utc::now().timestamp_millis(),
296 &Uuid::new_v4().to_string()[..8]
297 );
298
299 let (term_event_tx, _term_event_rx) = mpsc::unbounded_channel();
301 let event_listener = SessionEventListener {
302 sender: term_event_tx,
303 };
304
305 let term_config = TermConfig::default();
307 let term_size = TermSize {
308 cols: options.cols as usize,
309 rows: options.rows as usize,
310 };
311 let term = Term::new(term_config, &term_size, event_listener);
312
313 let (event_tx, _) = broadcast::channel(1000);
315 let (state_change_tx, _) = broadcast::channel(100);
316
317 Ok(Self {
318 id,
319 slot_id: options.slot_id,
320 cwd: options.cwd,
321 cols: options.cols,
322 rows: options.rows,
323
324 state: Arc::new(RwLock::new(SessionState::Starting)),
325 history: Arc::new(RwLock::new(Vec::new())),
326 terminal_title: Arc::new(RwLock::new(String::new())),
327 pending_tool_confirm: Arc::new(RwLock::new(None)),
328 permission_check: Arc::new(RwLock::new(None)),
329
330 pty_writer: Arc::new(Mutex::new(None)),
331 pty_pid: Arc::new(RwLock::new(None)),
332 running: Arc::new(AtomicBool::new(false)),
333
334 term: Arc::new(Mutex::new(term)),
335 extractor: Arc::new(Mutex::new(IncrementalExtractor::new(
336 options.rows as usize,
337 None,
338 ))),
339 text_assembler: Arc::new(Mutex::new(TextAssembler::new())),
340 current_turn_id: Arc::new(RwLock::new(None)),
341 stream_seq: Arc::new(RwLock::new(0)),
342 turn_counter: Arc::new(RwLock::new(0)),
343 line_source_by_y: Arc::new(RwLock::new(HashMap::new())),
344 assistant_block_active: Arc::new(AtomicBool::new(false)),
345
346 event_tx,
347 state_change_tx,
348 shutdown_tx: None,
349 log_file: options.log_file,
350 })
351 }
352
353 pub async fn state(&self) -> SessionState {
357 *self.state.read().await
358 }
359
360 pub async fn history(&self) -> Vec<Message> {
362 self.history.read().await.clone()
363 }
364
365 pub fn is_running(&self) -> bool {
367 self.running.load(Ordering::SeqCst)
368 }
369
370 pub async fn pid(&self) -> Option<u32> {
372 *self.pty_pid.read().await
373 }
374
375 pub async fn pending_tool_confirm(&self) -> Option<ConfirmInfo> {
377 self.pending_tool_confirm.read().await.clone()
378 }
379
380 pub async fn terminal_title(&self) -> String {
382 self.terminal_title.read().await.clone()
383 }
384
385 pub async fn get_screen_text(&self) -> String {
389 let term = self.term.lock().await;
390 let grid = term.grid();
391 let mut lines = Vec::new();
392
393 let total_lines = grid.total_lines();
394 let rows = grid.screen_lines();
395 let start = if total_lines > rows {
396 total_lines - rows
397 } else {
398 0
399 };
400
401 for y in start..total_lines {
402 let line = alacritty_terminal::index::Line(y as i32);
403 if y < grid.total_lines() {
404 let row = &grid[line];
405 let text: String = row.into_iter().map(|cell| cell.c).collect();
406 lines.push(text.trim_end().to_string());
407 }
408 }
409
410 lines.join("\n")
411 }
412
413 pub async fn get_last_lines(&self, n: usize) -> Vec<String> {
415 let term = self.term.lock().await;
416 let grid = term.grid();
417 let mut lines = Vec::new();
418
419 let total_lines = grid.total_lines();
420 let start = if total_lines > n { total_lines - n } else { 0 };
421
422 for y in start..total_lines {
423 let line = alacritty_terminal::index::Line(y as i32);
424 if y < grid.total_lines() {
425 let row = &grid[line];
426 let text: String = row.into_iter().map(|cell| cell.c).collect();
427 lines.push(text.trim_end().to_string());
428 }
429 }
430
431 lines
432 }
433
434 pub async fn start(&mut self) -> Result<()> {
438 if self.running.load(Ordering::SeqCst) {
439 return Err(anyhow!("Session already started"));
440 }
441
442 info!(slot_id = %self.slot_id, cwd = %self.cwd.display(), "Starting PTY session");
443
444 let pty_system = native_pty_system();
446 let pty_pair = pty_system.openpty(PtySize {
447 rows: self.rows,
448 cols: self.cols,
449 pixel_width: 0,
450 pixel_height: 0,
451 })?;
452
453 let mut cmd = CommandBuilder::new("zsh");
455 cmd.args([
456 "-l",
457 "-c",
458 &format!("claude --add-dir \"{}\"", self.cwd.display()),
459 ]);
460 cmd.cwd(&self.cwd);
461
462 cmd.env("TERM", "xterm-256color");
464 if let Ok(path) = std::env::var("PATH") {
465 cmd.env("PATH", path);
466 }
467 if let Ok(home) = std::env::var("HOME") {
468 cmd.env("HOME", home);
469 }
470
471 let mut child = pty_pair.slave.spawn_command(cmd)?;
473 let pid = child.process_id().unwrap_or(0);
474 *self.pty_pid.write().await = Some(pid);
475 info!(pid = pid, "PTY spawned");
476
477 let writer = pty_pair.master.take_writer()?;
479 *self.pty_writer.lock().await = Some(writer);
480
481 let reader = pty_pair.master.try_clone_reader()?;
483
484 self.running.store(true, Ordering::SeqCst);
485
486 let (shutdown_tx, shutdown_rx) = oneshot::channel();
488 self.shutdown_tx = Some(shutdown_tx);
489
490 let term = Arc::clone(&self.term);
492 let event_tx = self.event_tx.clone();
493 let running = Arc::clone(&self.running);
494
495 tokio::spawn(async move {
496 Self::read_loop(reader, term, event_tx, running, shutdown_rx).await;
497 });
498
499 let session_state = Arc::clone(&self.state);
501 let term_for_check = Arc::clone(&self.term);
502 let extractor = Arc::clone(&self.extractor);
503 let text_assembler = Arc::clone(&self.text_assembler);
504 let current_turn = Arc::clone(&self.current_turn_id);
505 let stream_seq = Arc::clone(&self.stream_seq);
506 let turn_counter = Arc::clone(&self.turn_counter);
507 let line_source = Arc::clone(&self.line_source_by_y);
508 let assistant_active = Arc::clone(&self.assistant_block_active);
509 let state_change_tx = self.state_change_tx.clone();
510 let event_tx_for_check = self.event_tx.clone();
511 let running_for_check = Arc::clone(&self.running);
512 let pending_confirm = Arc::clone(&self.pending_tool_confirm);
513 let permission_check = Arc::clone(&self.permission_check);
514 let pty_writer = Arc::clone(&self.pty_writer);
515
516 tokio::spawn(async move {
517 Self::state_check_loop(
518 session_state,
519 term_for_check,
520 extractor,
521 text_assembler,
522 current_turn,
523 stream_seq,
524 turn_counter,
525 line_source,
526 assistant_active,
527 state_change_tx,
528 event_tx_for_check,
529 running_for_check,
530 pending_confirm,
531 permission_check,
532 pty_writer,
533 )
534 .await;
535 });
536
537 let event_tx_for_exit = self.event_tx.clone();
539 let running_for_exit = Arc::clone(&self.running);
540 let state_for_exit = Arc::clone(&self.state);
541
542 tokio::spawn(async move {
543 let exit_status = tokio::task::spawn_blocking(move || child.wait())
545 .await
546 .ok()
547 .and_then(|r| r.ok());
548
549 let exit_code = exit_status
550 .map(|s| s.exit_code() as i32)
551 .unwrap_or(-1);
552
553 running_for_exit.store(false, Ordering::SeqCst);
554 *state_for_exit.write().await = SessionState::Exited;
555
556 let _ = event_tx_for_exit.send(SessionEvent::Exit(exit_code));
557 info!(exit_code = exit_code, "PTY exited");
558 });
559
560 self.wait_for_state(SessionState::Idle, Duration::from_secs(60))
562 .await?;
563
564 Ok(())
565 }
566
567 async fn read_loop(
569 reader: Box<dyn Read + Send>,
570 _term: Arc<Mutex<Term<SessionEventListener>>>,
571 event_tx: broadcast::Sender<SessionEvent>,
572 running: Arc<AtomicBool>,
573 _shutdown_rx: oneshot::Receiver<()>,
574 ) {
575 let running_clone = Arc::clone(&running);
577
578 tokio::task::spawn_blocking(move || {
579 let mut reader = reader;
580 let mut buf = [0u8; 4096];
581
582 while running_clone.load(Ordering::SeqCst) {
583 match reader.read(&mut buf) {
584 Ok(0) => break, Ok(n) => {
586 let data = buf[..n].to_vec();
587 let _ = event_tx.send(SessionEvent::Data(data));
591 }
592 Err(e) => {
593 error!(error = %e, "PTY read error");
594 break;
595 }
596 }
597 }
598 });
599 }
600
601 #[allow(clippy::too_many_arguments)]
603 async fn state_check_loop(
604 state: Arc<RwLock<SessionState>>,
605 term: Arc<Mutex<Term<SessionEventListener>>>,
606 extractor: Arc<Mutex<IncrementalExtractor>>,
607 text_assembler: Arc<Mutex<TextAssembler>>,
608 current_turn_id: Arc<RwLock<Option<u64>>>,
609 stream_seq: Arc<RwLock<u64>>,
610 turn_counter: Arc<RwLock<u64>>,
611 line_source_by_y: Arc<RwLock<HashMap<usize, ScreenTextSource>>>,
612 assistant_block_active: Arc<AtomicBool>,
613 state_change_tx: broadcast::Sender<(SessionState, SessionState)>,
614 event_tx: broadcast::Sender<SessionEvent>,
615 running: Arc<AtomicBool>,
616 pending_tool_confirm: Arc<RwLock<Option<ConfirmInfo>>>,
617 permission_check: Arc<RwLock<Option<Box<dyn Fn(&ConfirmInfo) -> PermissionDecision + Send + Sync>>>>,
618 pty_writer: Arc<Mutex<Option<Box<dyn IoWrite + Send>>>>,
619 ) {
620 let mut check_interval = interval(Duration::from_millis(100));
621
622 let state_parser = ClaudeCodeStateParser::new();
624 let confirm_parser = ClaudeCodeConfirmParser::new();
625 let status_parser = ClaudeCodeStatusParser::new();
626 let tool_parser = ClaudeCodeToolOutputParser::new();
627 let fingerprint_registry = default_registry();
628
629 while running.load(Ordering::SeqCst) {
630 check_interval.tick().await;
631
632 let delta = {
634 let term_guard = term.lock().await;
635 let mut extractor_guard = extractor.lock().await;
636 extractor_guard.extract(&*term_guard)
637 };
638
639 let last_lines = {
641 let term_guard = term.lock().await;
642 let grid = term_guard.grid();
643 let mut lines = Vec::new();
644 let total = grid.total_lines();
645 let start = if total > 20 { total - 20 } else { 0 };
646 for y in start..total {
647 let line = alacritty_terminal::index::Line(y as i32);
648 if y < total {
649 let row = &grid[line];
650 let text: String = row.into_iter().map(|cell| cell.c).collect();
651 lines.push(text.trim_end().to_string());
652 }
653 }
654 lines
655 };
656
657 let current_state = *state.read().await;
659 let context = ParserContext::new(last_lines.clone())
660 .with_state(current_state_to_semantic(current_state));
661
662 let hints = fingerprint_registry.extract(&context).hints;
664
665 let detected_result = state_parser.detect_state(&context);
667 let detected_state = detected_result.as_ref().map(|r| semantic_state_to_session_state(r.state));
668
669 if let Some(new_state) = detected_state {
671 if let Some(ref result) = detected_result {
673 if let Some(ref meta) = result.meta {
674 if meta.needs_trust_confirm == Some(true) {
675 debug!("Auto-confirming trust dialog");
676 if let Some(writer) = pty_writer.lock().await.as_mut() {
677 let _ = writer.write_all(b"\r");
678 }
679 continue;
680 }
681 }
682 }
683
684 if new_state != current_state {
685 if new_state.is_processing() && !current_state.is_processing() {
687 let mut counter = turn_counter.write().await;
688 *counter += 1;
689 *current_turn_id.write().await = Some(*counter);
690 *stream_seq.write().await = 0;
691 text_assembler.lock().await.reset();
692 line_source_by_y.write().await.clear();
693 assistant_block_active.store(false, Ordering::SeqCst);
694 debug!(turn_id = *counter, "Begin turn");
695 }
696
697 *state.write().await = new_state;
698
699 if current_state.is_processing() && !new_state.is_processing() {
701 if let Some(turn_id) = *current_turn_id.read().await {
702 let content = text_assembler.lock().await.finalize();
703 let _ = event_tx.send(SessionEvent::TextOutput(
704 TextOutputEvent::Complete {
705 turn_id,
706 content,
707 timestamp: Utc::now().timestamp_millis(),
708 },
709 ));
710 debug!(turn_id = turn_id, "End turn");
711 }
712 *current_turn_id.write().await = None;
713 *stream_seq.write().await = 0;
714 text_assembler.lock().await.reset();
715 line_source_by_y.write().await.clear();
716 assistant_block_active.store(false, Ordering::SeqCst);
717 }
718
719 if new_state == SessionState::Confirming {
721 let semantic_confirm = confirm_parser.detect_confirm(&context);
722 let confirm_info = semantic_confirm.as_ref().map(convert_semantic_confirm_info);
723 *pending_tool_confirm.write().await = confirm_info.clone();
724
725 if let Some(ref info) = confirm_info {
727 let permission = permission_check.read().await;
728 if let Some(ref check_fn) = *permission {
729 let decision = check_fn(info);
730 match decision {
731 PermissionDecision::Allow => {
732 if let Some(writer) =
734 pty_writer.lock().await.as_mut()
735 {
736 let _ = writer.write_all(b"\r");
737 }
738 continue;
739 }
740 PermissionDecision::Deny => {
741 if let Some(writer) =
743 pty_writer.lock().await.as_mut()
744 {
745 let _ =
746 writer.write_all(b"\x1b[B\x1b[B\r");
747 }
748 continue;
749 }
750 PermissionDecision::Confirm => {
751 }
753 }
754 }
755 }
756
757 let _ = event_tx.send(SessionEvent::ConfirmRequired {
758 prompt: last_lines.join("\n"),
759 info: confirm_info,
760 });
761 }
762
763 let _ = state_change_tx.send((new_state, current_state));
764 let _ = event_tx.send(SessionEvent::StateChange {
765 new_state,
766 prev_state: current_state,
767 });
768 }
769 }
770
771 if hints.has_spinner {
773 if let Some(status) = status_parser.parse(&context) {
774 let _ = event_tx.send(SessionEvent::StatusUpdate(status));
775 }
776 }
777
778 if hints.has_tool_output {
780 if let Some(result) = tool_parser.parse(&context) {
781 let _ = event_tx.send(SessionEvent::ToolOutput(result.data));
782 }
783 }
784
785 if !delta.stable_ops.is_empty() && current_state.is_processing() {
787 let turn_id = *current_turn_id.read().await;
788 if let Some(turn_id) = turn_id {
789 for op in &delta.stable_ops {
790 let source = classify_stable_op(op);
791 if matches!(source, ScreenTextSource::Assistant) {
792 let chunk = text_assembler.lock().await.apply(op);
793 if !chunk.is_empty() {
794 let seq = {
795 let mut seq_guard = stream_seq.write().await;
796 let s = *seq_guard;
797 *seq_guard += 1;
798 s
799 };
800 let _ =
801 event_tx.send(SessionEvent::TextOutput(TextOutputEvent::Stream {
802 turn_id,
803 seq,
804 content: chunk,
805 timestamp: Utc::now().timestamp_millis(),
806 }));
807 }
808 }
809 }
810 }
811 }
812 }
813 }
814
815 pub async fn write(&self, data: &str) -> Result<()> {
817 if !self.running.load(Ordering::SeqCst) {
818 return Err(anyhow!("Session not running"));
819 }
820
821 let mut writer_guard = self.pty_writer.lock().await;
822 if let Some(ref mut writer) = *writer_guard {
823 writer.write_all(data.as_bytes())?;
824 writer.flush()?;
825 debug!(data_len = data.len(), "Wrote to PTY");
826 Ok(())
827 } else {
828 Err(anyhow!("PTY writer not available"))
829 }
830 }
831
832 pub async fn send(&self, message: &str, timeout_ms: u64) -> Result<String> {
834 let state = self.state().await;
835 if state != SessionState::Idle {
836 return Err(anyhow!("Cannot send message in state: {:?}", state));
837 }
838
839 {
841 let mut history = self.history.write().await;
842 history.push(Message {
843 role: MessageRole::User,
844 content: message.trim().to_string(),
845 timestamp: Utc::now().timestamp_millis(),
846 });
847 }
848
849 let mut rx = self.event_tx.subscribe();
851
852 self.write(message).await?;
854 tokio::time::sleep(Duration::from_millis(100)).await;
855 self.write("\r").await?;
856
857 self.wait_for_state_change(SessionState::Idle, Duration::from_secs(30))
859 .await?;
860
861 let timeout_duration = Duration::from_millis(timeout_ms);
863 let result = timeout(timeout_duration, async {
864 loop {
865 if let Ok(event) = rx.recv().await {
866 if let SessionEvent::TextOutput(TextOutputEvent::Complete { content, .. }) =
867 event
868 {
869 return Ok(content);
870 }
871 if let SessionEvent::Exit(code) = event {
872 return Err(anyhow!("Session exited with code: {}", code));
873 }
874 }
875 }
876 })
877 .await;
878
879 match result {
880 Ok(Ok(response)) => {
881 {
883 let mut history = self.history.write().await;
884 history.push(Message {
885 role: MessageRole::Assistant,
886 content: response.clone(),
887 timestamp: Utc::now().timestamp_millis(),
888 });
889 }
890 Ok(response)
891 }
892 Ok(Err(e)) => Err(e),
893 Err(_) => Err(anyhow!("Timeout waiting for response")),
894 }
895 }
896
897 pub async fn confirm(&self, response: ConfirmResponse) -> Result<()> {
899 let state = self.state().await;
900 if state != SessionState::Confirming {
901 warn!(state = ?state, "Not in confirming state");
902 return Ok(());
903 }
904
905 let input = match response {
906 ConfirmResponse::Yes => "\r",
907 ConfirmResponse::No => "\x1b[B\x1b[B\r", ConfirmResponse::Option(n) => {
909 let downs = "\x1b[B".repeat(n.saturating_sub(1));
910 let input = format!("{}\r", downs);
911 self.write(&input).await?;
912 return Ok(());
913 }
914 };
915
916 self.write(input).await
917 }
918
919 pub async fn interrupt(&self) -> Result<()> {
921 self.write("\x03").await
922 }
923
924 pub async fn set_permission_check<F>(&self, callback: F)
926 where
927 F: Fn(&ConfirmInfo) -> PermissionDecision + Send + Sync + 'static,
928 {
929 *self.permission_check.write().await = Some(Box::new(callback));
930 }
931
932 pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
934 self.event_tx.subscribe()
935 }
936
937 pub fn subscribe_state_changes(&self) -> broadcast::Receiver<(SessionState, SessionState)> {
939 self.state_change_tx.subscribe()
940 }
941
942 pub async fn wait_for_state(&self, target: SessionState, timeout_duration: Duration) -> Result<()> {
944 let current = self.state().await;
945 if current == target {
946 return Ok(());
947 }
948
949 let mut rx = self.state_change_tx.subscribe();
950
951 timeout(timeout_duration, async {
952 loop {
953 if let Ok((new_state, _)) = rx.recv().await {
954 if new_state == target {
955 return Ok(());
956 }
957 if matches!(new_state, SessionState::Error | SessionState::Exited) {
958 return Err(anyhow!(
959 "Session entered {:?} while waiting for {:?}",
960 new_state,
961 target
962 ));
963 }
964 }
965 }
966 })
967 .await
968 .map_err(|_| anyhow!("Timeout waiting for state: {:?}", target))?
969 }
970
971 async fn wait_for_state_change(
973 &self,
974 current: SessionState,
975 timeout_duration: Duration,
976 ) -> Result<()> {
977 let state = self.state().await;
978 if state != current {
979 return Ok(());
980 }
981
982 let mut rx = self.state_change_tx.subscribe();
983
984 timeout(timeout_duration, async {
985 loop {
986 if let Ok((new_state, _)) = rx.recv().await {
987 if new_state != current {
988 return Ok(());
989 }
990 }
991 }
992 })
993 .await
994 .map_err(|_| anyhow!("Timeout waiting to leave state: {:?}", current))?
995 }
996
997 pub async fn close(&mut self) -> Result<()> {
999 if !self.running.load(Ordering::SeqCst) {
1000 return Ok(());
1001 }
1002
1003 info!("Closing PTY session");
1004
1005 let _ = self.write("/exit\r").await;
1007
1008 let timeout_result = timeout(Duration::from_secs(3), async {
1010 let mut rx = self.event_tx.subscribe();
1011 loop {
1012 if let Ok(SessionEvent::Exit(_)) = rx.recv().await {
1013 break;
1014 }
1015 }
1016 })
1017 .await;
1018
1019 if timeout_result.is_err() {
1020 self.kill().await;
1022 }
1023
1024 Ok(())
1025 }
1026
1027 pub async fn kill(&mut self) {
1029 self.running.store(false, Ordering::SeqCst);
1030 if let Some(tx) = self.shutdown_tx.take() {
1031 let _ = tx.send(());
1032 }
1033 *self.pty_writer.lock().await = None;
1034 info!("PTY session killed");
1035 }
1036}
1037
1038pub enum ConfirmResponse {
1040 Yes,
1041 No,
1042 Option(usize),
1043}
1044
1045fn semantic_state_to_session_state(state: SemanticState) -> SessionState {
1049 match state {
1050 SemanticState::Starting => SessionState::Starting,
1051 SemanticState::Idle => SessionState::Idle,
1052 SemanticState::Thinking => SessionState::Thinking,
1053 SemanticState::ToolRunning => SessionState::ToolRunning,
1054 SemanticState::Confirming => SessionState::Confirming,
1055 SemanticState::Error => SessionState::Error,
1056 }
1057}
1058
1059fn current_state_to_semantic(state: SessionState) -> SemanticState {
1061 match state {
1062 SessionState::Starting => SemanticState::Starting,
1063 SessionState::Idle => SemanticState::Idle,
1064 SessionState::Thinking => SemanticState::Thinking,
1065 SessionState::Responding => SemanticState::Thinking, SessionState::ToolRunning => SemanticState::ToolRunning,
1067 SessionState::Confirming => SemanticState::Confirming,
1068 SessionState::Error => SemanticState::Error,
1069 SessionState::Exited => SemanticState::Idle, }
1071}
1072
1073fn convert_semantic_confirm_info(info: &SemanticConfirmInfo) -> ConfirmInfo {
1075 let options: Vec<String> = info
1076 .options
1077 .as_ref()
1078 .map(|opts| opts.iter().map(|o| o.label.clone()).collect())
1079 .unwrap_or_default();
1080
1081 let tool = info.tool.as_ref().map(|t| ToolInfo {
1082 name: t.name.clone(),
1083 mcp_server: t.mcp_server.clone(),
1084 params: t
1085 .params
1086 .iter()
1087 .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
1088 .collect(),
1089 });
1090
1091 ConfirmInfo {
1092 confirm_type: format!("{:?}", info.confirm_type).to_lowercase(),
1093 tool,
1094 options,
1095 selected: 0, }
1097}
1098
1099fn classify_stable_op(op: &StableTextOp) -> ScreenTextSource {
1101 let text = op.text();
1102 let trimmed = text.trim_start();
1103
1104 if trimmed.starts_with('>') || trimmed.starts_with('❯') {
1106 return ScreenTextSource::User;
1107 }
1108
1109 if trimmed.starts_with('⎿') || trimmed.starts_with('│') {
1111 return ScreenTextSource::Tool;
1112 }
1113
1114 if trimmed.starts_with('⏺') {
1116 if trimmed.contains('(') && !trimmed.contains("completed") {
1118 return ScreenTextSource::Tool;
1119 }
1120 return ScreenTextSource::Assistant;
1121 }
1122
1123 if trimmed.contains("ctrl+")
1125 || trimmed.contains("Ctrl+")
1126 || trimmed.contains("IDE disconnected")
1127 {
1128 return ScreenTextSource::Ui;
1129 }
1130
1131 if trimmed
1133 .chars()
1134 .any(|c| matches!(c, '╭' | '╮' | '╯' | '╰' | '┌' | '┐' | '└' | '┘' | '─' | '━' | '═'))
1135 {
1136 return ScreenTextSource::Ui;
1137 }
1138
1139 ScreenTextSource::Unknown
1140}