fresh/services/terminal/manager.rs
1//! Terminal Manager - manages multiple terminal sessions
2//!
3//! This module provides a manager for terminal sessions that:
4//! - Spawns PTY processes with proper shell detection
5//! - Manages multiple concurrent terminals
6//! - Routes input/output between the editor and terminal processes
7//! - Handles terminal resize events
8//!
9//! # Role in Incremental Streaming Architecture
10//!
11//! The manager owns the PTY read loop which is the entry point for incremental
12//! scrollback streaming. See `super` module docs for the full architecture overview.
13//!
14//! ## PTY Read Loop
15//!
16//! The read loop in `spawn()` performs incremental streaming: for each PTY read,
17//! it calls `process_output()` to update the terminal grid, then `flush_new_scrollback()`
18//! to append any new scrollback lines to the backing file. This ensures scrollback is
19//! written incrementally as lines scroll off screen, avoiding O(n) work on mode switches.
20
21use super::term::TerminalState;
22use crate::services::async_bridge::AsyncBridge;
23use crate::services::authority::TerminalWrapper;
24use portable_pty::{native_pty_system, CommandBuilder, PtySize};
25use std::collections::HashMap;
26use std::io::{Read, Write};
27use std::sync::atomic::AtomicBool;
28use std::sync::mpsc;
29use std::sync::{Arc, Mutex};
30use std::thread;
31
32pub use fresh_core::TerminalId;
33
34/// Messages sent to terminal I/O thread
35enum TerminalCommand {
36 /// Write data to PTY
37 Write(Vec<u8>),
38 /// Resize the PTY
39 Resize { cols: u16, rows: u16 },
40 /// Shutdown the terminal
41 Shutdown,
42}
43
44/// Handle to a running terminal session
45pub struct TerminalHandle {
46 /// Terminal state (grid, cursor, etc.)
47 pub state: Arc<Mutex<TerminalState>>,
48 /// Command sender to I/O thread
49 command_tx: mpsc::Sender<TerminalCommand>,
50 /// Whether the terminal is still alive
51 alive: Arc<std::sync::atomic::AtomicBool>,
52 /// Current dimensions
53 cols: u16,
54 rows: u16,
55 /// Working directory used for the terminal
56 cwd: Option<std::path::PathBuf>,
57 /// Shell executable used to spawn the terminal
58 shell: String,
59}
60
61impl TerminalHandle {
62 /// Write data to the terminal (sends to PTY)
63 pub fn write(&self, data: &[u8]) {
64 // Receiver may be dropped if terminal exited; nothing to do in that case.
65 #[allow(clippy::let_underscore_must_use)]
66 let _ = self.command_tx.send(TerminalCommand::Write(data.to_vec()));
67 }
68
69 /// Resize the terminal
70 pub fn resize(&mut self, cols: u16, rows: u16) {
71 if cols != self.cols || rows != self.rows {
72 self.cols = cols;
73 self.rows = rows;
74 // Receiver may be dropped if terminal exited; nothing to do in that case.
75 #[allow(clippy::let_underscore_must_use)]
76 let _ = self.command_tx.send(TerminalCommand::Resize { cols, rows });
77 // Also resize the terminal state
78 if let Ok(mut state) = self.state.lock() {
79 state.resize(cols, rows);
80 }
81 }
82 }
83
84 /// Check if the terminal is still running
85 pub fn is_alive(&self) -> bool {
86 self.alive.load(std::sync::atomic::Ordering::Relaxed)
87 }
88
89 /// Shutdown the terminal
90 pub fn shutdown(&self) {
91 // Receiver may be dropped if terminal already exited; nothing to do in that case.
92 #[allow(clippy::let_underscore_must_use)]
93 let _ = self.command_tx.send(TerminalCommand::Shutdown);
94 }
95
96 /// Get current dimensions
97 pub fn size(&self) -> (u16, u16) {
98 (self.cols, self.rows)
99 }
100
101 /// Get the working directory configured for the terminal
102 pub fn cwd(&self) -> Option<std::path::PathBuf> {
103 self.cwd.clone()
104 }
105
106 /// Get the shell executable path used for this terminal
107 pub fn shell(&self) -> &str {
108 &self.shell
109 }
110}
111
112/// Manager for multiple terminal sessions
113pub struct TerminalManager {
114 /// Map from terminal ID to handle
115 terminals: HashMap<TerminalId, TerminalHandle>,
116 /// Next terminal ID
117 next_id: usize,
118 /// Async bridge for sending notifications to main loop
119 async_bridge: Option<AsyncBridge>,
120}
121
122impl TerminalManager {
123 /// Create a new terminal manager
124 pub fn new() -> Self {
125 Self {
126 terminals: HashMap::new(),
127 next_id: 0,
128 async_bridge: None,
129 }
130 }
131
132 /// Set the async bridge for communication with main loop
133 pub fn set_async_bridge(&mut self, bridge: AsyncBridge) {
134 self.async_bridge = Some(bridge);
135 }
136
137 /// Peek at the next terminal ID that would be assigned.
138 pub fn next_terminal_id(&self) -> TerminalId {
139 TerminalId(self.next_id)
140 }
141
142 /// Spawn a new terminal session
143 ///
144 /// # Arguments
145 /// * `cols` - Initial terminal width in columns
146 /// * `rows` - Initial terminal height in rows
147 /// * `cwd` - Optional working directory (defaults to current directory)
148 /// * `log_path` - Optional path for raw PTY log (for session restore)
149 /// * `backing_path` - Optional path for rendered scrollback (incremental streaming)
150 ///
151 /// # Returns
152 /// The terminal ID if successful
153 pub fn spawn(
154 &mut self,
155 cols: u16,
156 rows: u16,
157 cwd: Option<std::path::PathBuf>,
158 log_path: Option<std::path::PathBuf>,
159 backing_path: Option<std::path::PathBuf>,
160 terminal_wrapper: crate::services::authority::TerminalWrapper,
161 ) -> Result<TerminalId, String> {
162 let id = TerminalId(self.next_id);
163 self.next_id += 1;
164
165 // Try to spawn a real PTY-backed terminal first.
166 let handle_result: Result<TerminalHandle, String> = (|| {
167 // Create PTY
168 let pty_system = native_pty_system();
169 let pty_pair = pty_system
170 .openpty(PtySize {
171 rows,
172 cols,
173 pixel_width: 0,
174 pixel_height: 0,
175 })
176 .map_err(|e| {
177 #[cfg(windows)]
178 {
179 format!(
180 "Failed to open PTY: {}. Note: Terminal requires Windows 10 version 1809 or later with ConPTY support.",
181 e
182 )
183 }
184 #[cfg(not(windows))]
185 {
186 format!("Failed to open PTY: {}", e)
187 }
188 })?;
189
190 // The active authority's terminal wrapper drives the shell
191 // command unconditionally — local wraps `detect_shell()` with
192 // no args; container/remote authorities re-parent into
193 // `docker exec -w …`, `ssh …`, etc. `manages_cwd` says
194 // whether the wrapper's args already establish cwd (in which
195 // case `CommandBuilder::cwd()` is skipped).
196 let TerminalWrapper {
197 command: shell,
198 args: cmd_args,
199 manages_cwd: skip_cwd,
200 } = terminal_wrapper;
201 tracing::info!("Spawning terminal with shell: {}", shell);
202
203 let mut cmd = CommandBuilder::new(&shell);
204 for arg in &cmd_args {
205 cmd.arg(arg);
206 }
207 if !skip_cwd {
208 if let Some(ref dir) = cwd {
209 cmd.cwd(dir);
210 }
211 }
212
213 // Set TERM so programs like less know the terminal capabilities.
214 // The built-in emulator is alacritty-based so xterm-256color is appropriate.
215 cmd.env("TERM", "xterm-256color");
216
217 // On Windows, set additional environment variables that help with ConPTY
218 #[cfg(windows)]
219 {
220 // Ensure PROMPT is set for cmd.exe
221 if shell.to_lowercase().contains("cmd") {
222 cmd.env("PROMPT", "$P$G");
223 }
224 }
225
226 // Spawn the shell process
227 let mut child = pty_pair
228 .slave
229 .spawn_command(cmd)
230 .map_err(|e| format!("Failed to spawn shell '{}': {}", shell, e))?;
231
232 tracing::debug!("Shell process spawned successfully");
233
234 // Create terminal state
235 let state = Arc::new(Mutex::new(TerminalState::new(cols, rows)));
236
237 // Initialize backing_file_history_end if backing file already exists (session restore)
238 // This ensures enter_terminal_mode doesn't truncate existing history to 0
239 if let Some(ref p) = backing_path {
240 if let Ok(metadata) = std::fs::metadata(p) {
241 if metadata.len() > 0 {
242 if let Ok(mut s) = state.lock() {
243 s.set_backing_file_history_end(metadata.len());
244 }
245 }
246 }
247 }
248
249 // Create communication channel
250 let (command_tx, command_rx) = mpsc::channel::<TerminalCommand>();
251
252 // Alive flag
253 let alive = Arc::new(AtomicBool::new(true));
254 let alive_clone = alive.clone();
255
256 // Get master for I/O
257 let mut master = pty_pair
258 .master
259 .take_writer()
260 .map_err(|e| format!("Failed to get PTY writer: {}", e))?;
261
262 let mut reader = pty_pair
263 .master
264 .try_clone_reader()
265 .map_err(|e| format!("Failed to get PTY reader: {}", e))?;
266
267 // Clone state for reader thread
268 let state_clone = state.clone();
269 let async_bridge = self.async_bridge.clone();
270
271 // Optional raw log writer for full-session capture (for live terminal resume)
272 let mut log_writer = log_path
273 .as_ref()
274 .and_then(|p| {
275 std::fs::OpenOptions::new()
276 .create(true)
277 .append(true)
278 .open(p)
279 .ok()
280 })
281 .map(std::io::BufWriter::new);
282
283 // Backing file writer for incremental scrollback streaming
284 // During session restore, the backing file may already contain scrollback content.
285 // We open for append to continue streaming new scrollback after the existing content.
286 // For new terminals, append mode also works (creates file if needed).
287 let mut backing_writer = backing_path
288 .as_ref()
289 .and_then(|p| {
290 // Check if backing file exists and has content (session restore case)
291 let existing_has_content =
292 p.exists() && std::fs::metadata(p).map(|m| m.len() > 0).unwrap_or(false);
293
294 if existing_has_content {
295 // Session restore: open for append to continue streaming new scrollback
296 // The existing content is preserved and loaded into buffer separately.
297 // Note: enter_terminal_mode will truncate when user re-enters terminal.
298 std::fs::OpenOptions::new()
299 .create(true)
300 .append(true)
301 .open(p)
302 .ok()
303 } else {
304 // New terminal: start fresh with truncate
305 std::fs::OpenOptions::new()
306 .create(true)
307 .write(true)
308 .truncate(true)
309 .open(p)
310 .ok()
311 }
312 })
313 .map(std::io::BufWriter::new);
314
315 // Spawn reader thread
316 let terminal_id = id;
317 let pty_response_tx = command_tx.clone();
318 thread::spawn(move || {
319 tracing::debug!("Terminal {:?} reader thread started", terminal_id);
320 let mut buf = [0u8; 4096];
321 let mut total_bytes = 0usize;
322 loop {
323 match reader.read(&mut buf) {
324 Ok(0) => {
325 // EOF - process exited
326 tracing::info!(
327 "Terminal {:?} EOF after {} total bytes",
328 terminal_id,
329 total_bytes
330 );
331 break;
332 }
333 Ok(n) => {
334 total_bytes += n;
335 tracing::debug!(
336 "Terminal {:?} received {} bytes (total: {})",
337 terminal_id,
338 n,
339 total_bytes
340 );
341 // Process output through terminal emulator and stream scrollback
342 if let Ok(mut state) = state_clone.lock() {
343 state.process_output(&buf[..n]);
344
345 // Send any PTY write responses (e.g., DSR cursor position)
346 // This is critical for Windows ConPTY where PowerShell waits
347 // for cursor position response before showing the prompt
348 for response in state.drain_pty_write_queue() {
349 tracing::debug!(
350 "Terminal {:?} sending PTY response: {:?}",
351 terminal_id,
352 response
353 );
354 // Receiver may be dropped if writer thread exited.
355 #[allow(clippy::let_underscore_must_use)]
356 let _ = pty_response_tx
357 .send(TerminalCommand::Write(response.into_bytes()));
358 }
359
360 // Incrementally stream new scrollback lines to backing file
361 if let Some(ref mut writer) = backing_writer {
362 match state.flush_new_scrollback(writer) {
363 Ok(lines_written) => {
364 if lines_written > 0 {
365 // Update the history end offset
366 if let Ok(pos) = writer.get_ref().metadata() {
367 state.set_backing_file_history_end(pos.len());
368 }
369 // Best-effort flush; backing file errors handled below.
370 #[allow(clippy::let_underscore_must_use)]
371 let _ = writer.flush();
372 }
373 }
374 Err(e) => {
375 tracing::warn!(
376 "Terminal backing file write error: {}",
377 e
378 );
379 backing_writer = None;
380 }
381 }
382 }
383 }
384
385 // Append raw bytes to log if available (for session restore replay)
386 if let Some(w) = log_writer.as_mut() {
387 if let Err(e) = w.write_all(&buf[..n]) {
388 tracing::warn!("Terminal log write error: {}", e);
389 log_writer = None; // stop logging on error
390 } else if let Err(e) = w.flush() {
391 tracing::warn!("Terminal log flush error: {}", e);
392 log_writer = None;
393 }
394 }
395
396 // Notify main loop to redraw (receiver may be dropped during shutdown).
397 if let Some(ref bridge) = async_bridge {
398 #[allow(clippy::let_underscore_must_use)]
399 let _ = bridge.sender().send(
400 crate::services::async_bridge::AsyncMessage::TerminalOutput {
401 terminal_id,
402 },
403 );
404 }
405 }
406 Err(e) => {
407 tracing::error!("Terminal read error: {}", e);
408 break;
409 }
410 }
411 }
412 alive_clone.store(false, std::sync::atomic::Ordering::Relaxed);
413 // Best-effort flush of log/backing files during teardown.
414 if let Some(mut w) = log_writer {
415 #[allow(clippy::let_underscore_must_use)]
416 let _ = w.flush();
417 }
418 if let Some(mut w) = backing_writer {
419 #[allow(clippy::let_underscore_must_use)]
420 let _ = w.flush();
421 }
422 // Notify that terminal exited (receiver may be dropped during shutdown).
423 if let Some(ref bridge) = async_bridge {
424 #[allow(clippy::let_underscore_must_use)]
425 let _ = bridge.sender().send(
426 crate::services::async_bridge::AsyncMessage::TerminalExited { terminal_id },
427 );
428 }
429 });
430
431 // Spawn writer thread
432 let pty_size_ref = pty_pair.master;
433 thread::spawn(move || {
434 loop {
435 match command_rx.recv() {
436 Ok(TerminalCommand::Write(data)) => {
437 if let Err(e) = master.write_all(&data) {
438 tracing::error!("Terminal write error: {}", e);
439 break;
440 }
441 // Best-effort flush — PTY write errors are handled above.
442 #[allow(clippy::let_underscore_must_use)]
443 let _ = master.flush();
444 }
445 Ok(TerminalCommand::Resize { cols, rows }) => {
446 if let Err(e) = pty_size_ref.resize(PtySize {
447 rows,
448 cols,
449 pixel_width: 0,
450 pixel_height: 0,
451 }) {
452 tracing::warn!("Failed to resize PTY: {}", e);
453 }
454 }
455 Ok(TerminalCommand::Shutdown) | Err(_) => {
456 break;
457 }
458 }
459 }
460 // Best-effort child process cleanup during teardown.
461 #[allow(clippy::let_underscore_must_use)]
462 let _ = child.kill();
463 #[allow(clippy::let_underscore_must_use)]
464 let _ = child.wait();
465 });
466
467 // Create handle
468 Ok(TerminalHandle {
469 state,
470 command_tx,
471 alive,
472 cols,
473 rows,
474 cwd: cwd.clone(),
475 shell,
476 })
477 })();
478
479 let handle = handle_result?;
480
481 self.terminals.insert(id, handle);
482 tracing::info!("Created terminal {:?} ({}x{})", id, cols, rows);
483
484 Ok(id)
485 }
486
487 /// Get a terminal handle by ID
488 pub fn get(&self, id: TerminalId) -> Option<&TerminalHandle> {
489 self.terminals.get(&id)
490 }
491
492 /// Get a mutable terminal handle by ID
493 pub fn get_mut(&mut self, id: TerminalId) -> Option<&mut TerminalHandle> {
494 self.terminals.get_mut(&id)
495 }
496
497 /// Close a terminal
498 pub fn close(&mut self, id: TerminalId) -> bool {
499 if let Some(handle) = self.terminals.remove(&id) {
500 handle.shutdown();
501 true
502 } else {
503 false
504 }
505 }
506
507 /// Get all terminal IDs
508 pub fn terminal_ids(&self) -> Vec<TerminalId> {
509 self.terminals.keys().copied().collect()
510 }
511
512 /// Get count of open terminals
513 pub fn count(&self) -> usize {
514 self.terminals.len()
515 }
516
517 /// Shutdown all terminals
518 pub fn shutdown_all(&mut self) {
519 for (_, handle) in self.terminals.drain() {
520 handle.shutdown();
521 }
522 }
523
524 /// Clean up dead terminals
525 pub fn cleanup_dead(&mut self) -> Vec<TerminalId> {
526 let dead: Vec<TerminalId> = self
527 .terminals
528 .iter()
529 .filter(|(_, h)| !h.is_alive())
530 .map(|(id, _)| *id)
531 .collect();
532
533 for id in &dead {
534 self.terminals.remove(id);
535 }
536
537 dead
538 }
539}
540
541impl Default for TerminalManager {
542 fn default() -> Self {
543 Self::new()
544 }
545}
546
547impl Drop for TerminalManager {
548 fn drop(&mut self) {
549 self.shutdown_all();
550 }
551}
552
553/// Detect the user's shell
554pub fn detect_shell() -> String {
555 // Try $SHELL environment variable first
556 if let Ok(shell) = std::env::var("SHELL") {
557 if !shell.is_empty() {
558 return shell;
559 }
560 }
561
562 // Fall back to platform defaults
563 #[cfg(unix)]
564 {
565 "/bin/sh".to_string()
566 }
567 #[cfg(windows)]
568 {
569 // On Windows, prefer PowerShell for better ConPTY and ANSI escape support
570 // Check for PowerShell Core (pwsh) first, then Windows PowerShell
571 let powershell_paths = [
572 "pwsh.exe",
573 "powershell.exe",
574 r"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe",
575 ];
576
577 for ps in &powershell_paths {
578 if std::path::Path::new(ps).exists() || which_exists(ps) {
579 return ps.to_string();
580 }
581 }
582
583 // Fall back to COMSPEC (cmd.exe)
584 std::env::var("COMSPEC").unwrap_or_else(|_| "cmd.exe".to_string())
585 }
586}
587
588/// Check if command exists in PATH (Windows)
589#[cfg(windows)]
590fn which_exists(cmd: &str) -> bool {
591 if let Ok(path_var) = std::env::var("PATH") {
592 for path in path_var.split(';') {
593 let full_path = std::path::Path::new(path).join(cmd);
594 if full_path.exists() {
595 return true;
596 }
597 }
598 }
599 false
600}
601
602#[cfg(test)]
603mod tests {
604 use super::*;
605
606 #[test]
607 fn test_terminal_id_display() {
608 let id = TerminalId(42);
609 assert_eq!(format!("{}", id), "Terminal-42");
610 }
611
612 #[test]
613 fn test_detect_shell() {
614 let shell = detect_shell();
615 assert!(!shell.is_empty());
616 }
617}