bssh/pty/
mod.rs

1// Copyright 2025 Lablup Inc. and Jeongkyu Shin
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! PTY (Pseudo-terminal) support for interactive SSH sessions.
16//!
17//! This module provides true PTY allocation with full terminal emulation capabilities
18//! including terminal resize handling, raw mode support, and proper handling of colors
19//! and special keys.
20
21use anyhow::{Context, Result};
22use russh::{client::Msg, Channel};
23use signal_hook::{consts::SIGWINCH, iterator::Signals};
24use smallvec::SmallVec;
25use terminal_size::{terminal_size, Height, Width};
26use tokio::sync::{mpsc, watch};
27use tokio::time::Duration;
28
29pub mod session;
30pub mod terminal;
31
32pub use session::PtySession;
33pub use terminal::{force_terminal_cleanup, TerminalState, TerminalStateGuard};
34
35/// Session processing interval for multiplex mode
36/// - 100ms provides reasonable time-slicing for multiplex mode
37/// - Allows other async tasks to run without starving
38/// - Not critical for responsiveness as actual I/O is event-driven
39const SESSION_PROCESSING_INTERVAL_MS: u64 = 100;
40
41/// PTY session configuration
42#[derive(Debug, Clone)]
43pub struct PtyConfig {
44    /// Terminal type (e.g., "xterm-256color", "xterm", "vt100")
45    pub term_type: String,
46    /// Whether to force PTY allocation
47    pub force_pty: bool,
48    /// Whether to disable PTY allocation
49    pub disable_pty: bool,
50    /// Enable mouse event support
51    pub enable_mouse: bool,
52    /// Terminal input/output timeout
53    pub timeout: Duration,
54}
55
56impl Default for PtyConfig {
57    fn default() -> Self {
58        // Default PTY configuration timeout design:
59        // - 10ms provides rapid response to input/output events
60        // - Short enough to feel instantaneous to users (<20ms threshold)
61        // - Balances CPU usage with responsiveness for interactive terminals
62        const DEFAULT_PTY_TIMEOUT_MS: u64 = 10;
63
64        Self {
65            term_type: "xterm-256color".to_string(),
66            force_pty: false,
67            disable_pty: false,
68            enable_mouse: false,
69            timeout: Duration::from_millis(DEFAULT_PTY_TIMEOUT_MS),
70        }
71    }
72}
73
74/// PTY session state
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum PtyState {
77    /// PTY is not active
78    Inactive,
79    /// PTY is initializing
80    Initializing,
81    /// PTY is active and ready
82    Active,
83    /// PTY is being shut down
84    ShuttingDown,
85    /// PTY has been closed
86    Closed,
87}
88
89/// Terminal input/output message
90/// Uses SmallVec to avoid heap allocations for small messages (typical for key presses)
91#[derive(Debug)]
92pub enum PtyMessage {
93    /// Data from local terminal to send to remote
94    /// SmallVec<[u8; 8]> keeps key sequences stack-allocated
95    LocalInput(SmallVec<[u8; 8]>),
96    /// Data from remote to display on local terminal
97    /// SmallVec<[u8; 64]> handles most terminal output without allocation
98    RemoteOutput(SmallVec<[u8; 64]>),
99    /// Terminal resize event
100    Resize { width: u32, height: u32 },
101    /// PTY session should terminate
102    Terminate,
103    /// Error occurred
104    Error(String),
105}
106
107/// PTY manager for handling multiple PTY sessions
108pub struct PtyManager {
109    active_sessions: Vec<PtySession>,
110    cancel_tx: watch::Sender<bool>,
111    cancel_rx: watch::Receiver<bool>,
112}
113
114impl PtyManager {
115    /// Create a new PTY manager
116    pub fn new() -> Self {
117        let (cancel_tx, cancel_rx) = watch::channel(false);
118        Self {
119            active_sessions: Vec::new(),
120            cancel_tx,
121            cancel_rx,
122        }
123    }
124
125    /// Create a PTY session for a single node
126    pub async fn create_single_session(
127        &mut self,
128        channel: Channel<Msg>,
129        config: PtyConfig,
130    ) -> Result<usize> {
131        let session_id = self.active_sessions.len();
132        let session = PtySession::new(session_id, channel, config).await?;
133        self.active_sessions.push(session);
134        Ok(session_id)
135    }
136
137    /// Create PTY sessions for multiple nodes with multiplexing
138    pub async fn create_multiplex_sessions(
139        &mut self,
140        channels: Vec<Channel<Msg>>,
141        config: PtyConfig,
142    ) -> Result<Vec<usize>> {
143        let mut session_ids = Vec::new();
144        for channel in channels {
145            let session_id = self.create_single_session(channel, config.clone()).await?;
146            session_ids.push(session_id);
147        }
148        Ok(session_ids)
149    }
150
151    /// Run a single PTY session
152    pub async fn run_single_session(&mut self, session_id: usize) -> Result<()> {
153        let result = if let Some(session) = self.active_sessions.get_mut(session_id) {
154            session.run().await
155        } else {
156            anyhow::bail!("PTY session {session_id} not found")
157        };
158
159        // Ensure terminal is properly restored after session ends
160        // Use synchronized cleanup from terminal module
161        crate::pty::terminal::force_terminal_cleanup();
162
163        result
164    }
165
166    /// Run multiple PTY sessions with session switching
167    pub async fn run_multiplex_sessions(&mut self, session_ids: Vec<usize>) -> Result<()> {
168        if session_ids.is_empty() {
169            anyhow::bail!("No PTY sessions to run");
170        }
171
172        // Start with the first session active
173        let mut active_session = session_ids[0];
174
175        // Set up bounded channels for communication between sessions
176        // Session switching channel sizing:
177        // - 32 capacity handles burst session switches without blocking
178        // - Session switches are infrequent user actions, small buffer sufficient
179        // - Prevents memory exhaustion from accumulated switch commands
180        const SESSION_SWITCH_CHANNEL_SIZE: usize = 32;
181        let (_switch_tx, mut _switch_rx) = mpsc::channel::<usize>(SESSION_SWITCH_CHANNEL_SIZE);
182
183        // Run the multiplexed session loop using select! for efficient event handling
184        let mut cancel_rx = self.cancel_rx.clone();
185
186        loop {
187            tokio::select! {
188                // Check for cancellation signal
189                _ = cancel_rx.changed() => {
190                    if *cancel_rx.borrow() {
191                        tracing::debug!("PTY multiplex received cancellation signal");
192                        break;
193                    }
194                }
195
196                // Check for session switch commands
197                new_session = _switch_rx.recv() => {
198                    match new_session {
199                        Some(session_id) => {
200                            if session_ids.contains(&session_id) {
201                                active_session = session_id;
202                                println!("Switched to PTY session {session_id}");
203                            } else {
204                                eprintln!("Invalid PTY session: {session_id}");
205                            }
206                        }
207                        None => {
208                            // Switch channel closed
209                            break;
210                        }
211                    }
212                }
213
214                // Run active session processing
215                // Session processing interval design:
216                // - 100ms provides reasonable time-slicing for multiplex mode
217                // - Allows other async tasks to run without starving
218                // - Not critical for responsiveness as actual I/O is event-driven
219                _ = tokio::time::sleep(Duration::from_millis(SESSION_PROCESSING_INTERVAL_MS)) => {
220                    // TODO: Implement session time-slicing for multiplex mode
221                    // For now, just continue the loop
222                    if let Some(_session) = self.active_sessions.get_mut(active_session) {
223                        // Session processing would go here
224                    }
225                }
226            }
227        }
228
229        Ok(())
230    }
231
232    /// Shutdown all PTY sessions with proper select!-based cleanup
233    pub async fn shutdown(&mut self) -> Result<()> {
234        // Signal cancellation to all operations
235        let _ = self.cancel_tx.send(true);
236
237        // Use select! to handle concurrent shutdown of multiple sessions
238        let shutdown_futures: Vec<_> = self
239            .active_sessions
240            .iter_mut()
241            .map(|session| session.shutdown())
242            .collect();
243
244        // Wait for all sessions to shutdown with timeout
245        // PTY manager shutdown timeout design:
246        // - 5 seconds allows time for multiple sessions to cleanup gracefully
247        // - Long enough for network operations to complete (channel close, etc.)
248        // - Prevents indefinite hang if some sessions don't respond to shutdown
249        // - After timeout, remaining sessions are abandoned (memory cleanup via Drop)
250        const PTY_SHUTDOWN_TIMEOUT_SECS: u64 = 5;
251        let shutdown_timeout = Duration::from_secs(PTY_SHUTDOWN_TIMEOUT_SECS);
252
253        tokio::select! {
254            results = futures::future::try_join_all(shutdown_futures) => {
255                match results {
256                    Ok(_) => tracing::debug!("All PTY sessions shutdown successfully"),
257                    Err(e) => tracing::warn!("Some PTY sessions failed to shutdown cleanly: {e}"),
258                }
259            }
260            _ = tokio::time::sleep(shutdown_timeout) => {
261                tracing::warn!("PTY session shutdown timed out after {} seconds", shutdown_timeout.as_secs());
262            }
263        }
264
265        self.active_sessions.clear();
266        Ok(())
267    }
268}
269
270impl Default for PtyManager {
271    fn default() -> Self {
272        Self::new()
273    }
274}
275
276/// Utility functions for PTY operations
277pub mod utils {
278    use super::*;
279
280    /// Check if PTY should be allocated based on configuration and terminal state
281    pub fn should_allocate_pty(config: &PtyConfig) -> Result<bool> {
282        if config.disable_pty {
283            return Ok(false);
284        }
285
286        if config.force_pty {
287            return Ok(true);
288        }
289
290        // Auto-detect if we're in an interactive terminal
291        Ok(atty::is(atty::Stream::Stdin) && atty::is(atty::Stream::Stdout))
292    }
293
294    /// Get current terminal size
295    pub fn get_terminal_size() -> Result<(u32, u32)> {
296        if let Some((Width(w), Height(h))) = terminal_size() {
297            Ok((u32::from(w), u32::from(h)))
298        } else {
299            // Default size if terminal size cannot be determined
300            Ok((80, 24))
301        }
302    }
303
304    /// Setup terminal resize signal handler
305    pub fn setup_resize_handler() -> Result<Signals> {
306        let signals = Signals::new([SIGWINCH])
307            .with_context(|| "Failed to register SIGWINCH signal handler")?;
308        Ok(signals)
309    }
310
311    /// Check if the current process has controlling terminal
312    pub fn has_controlling_terminal() -> bool {
313        atty::is(atty::Stream::Stdin) && atty::is(atty::Stream::Stdout)
314    }
315}
316
317// Re-export key types
318pub use utils::*;