use anyhow::{Context, Result};
use russh::{client::Msg, Channel};
use signal_hook::{consts::SIGWINCH, iterator::Signals};
use smallvec::SmallVec;
use terminal_size::{terminal_size, Height, Width};
use tokio::sync::{mpsc, watch};
use tokio::time::Duration;
pub mod session;
pub mod terminal;
pub use session::PtySession;
pub use terminal::{force_terminal_cleanup, TerminalState, TerminalStateGuard};
const SESSION_PROCESSING_INTERVAL_MS: u64 = 100;
#[derive(Debug, Clone)]
pub struct PtyConfig {
pub term_type: String,
pub force_pty: bool,
pub disable_pty: bool,
pub enable_mouse: bool,
pub timeout: Duration,
}
impl Default for PtyConfig {
fn default() -> Self {
const DEFAULT_PTY_TIMEOUT_MS: u64 = 10;
Self {
term_type: "xterm-256color".to_string(),
force_pty: false,
disable_pty: false,
enable_mouse: false,
timeout: Duration::from_millis(DEFAULT_PTY_TIMEOUT_MS),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PtyState {
Inactive,
Initializing,
Active,
ShuttingDown,
Closed,
}
#[derive(Debug)]
pub enum PtyMessage {
LocalInput(SmallVec<[u8; 8]>),
RemoteOutput(SmallVec<[u8; 64]>),
Resize { width: u32, height: u32 },
Terminate,
Error(String),
}
pub struct PtyManager {
active_sessions: Vec<PtySession>,
cancel_tx: watch::Sender<bool>,
cancel_rx: watch::Receiver<bool>,
}
impl PtyManager {
pub fn new() -> Self {
let (cancel_tx, cancel_rx) = watch::channel(false);
Self {
active_sessions: Vec::new(),
cancel_tx,
cancel_rx,
}
}
pub async fn create_single_session(
&mut self,
channel: Channel<Msg>,
config: PtyConfig,
) -> Result<usize> {
let session_id = self.active_sessions.len();
let session = PtySession::new(session_id, channel, config).await?;
self.active_sessions.push(session);
Ok(session_id)
}
pub async fn create_multiplex_sessions(
&mut self,
channels: Vec<Channel<Msg>>,
config: PtyConfig,
) -> Result<Vec<usize>> {
let mut session_ids = Vec::new();
for channel in channels {
let session_id = self.create_single_session(channel, config.clone()).await?;
session_ids.push(session_id);
}
Ok(session_ids)
}
pub async fn run_single_session(&mut self, session_id: usize) -> Result<()> {
let result = if let Some(session) = self.active_sessions.get_mut(session_id) {
session.run().await
} else {
anyhow::bail!("PTY session {session_id} not found")
};
crate::pty::terminal::force_terminal_cleanup();
result
}
pub async fn run_multiplex_sessions(&mut self, session_ids: Vec<usize>) -> Result<()> {
if session_ids.is_empty() {
anyhow::bail!("No PTY sessions to run");
}
let mut active_session = session_ids[0];
const SESSION_SWITCH_CHANNEL_SIZE: usize = 32;
let (_switch_tx, mut _switch_rx) = mpsc::channel::<usize>(SESSION_SWITCH_CHANNEL_SIZE);
let mut cancel_rx = self.cancel_rx.clone();
loop {
tokio::select! {
_ = cancel_rx.changed() => {
if *cancel_rx.borrow() {
tracing::debug!("PTY multiplex received cancellation signal");
break;
}
}
new_session = _switch_rx.recv() => {
match new_session {
Some(session_id) => {
if session_ids.contains(&session_id) {
active_session = session_id;
println!("Switched to PTY session {session_id}");
} else {
eprintln!("Invalid PTY session: {session_id}");
}
}
None => {
break;
}
}
}
_ = tokio::time::sleep(Duration::from_millis(SESSION_PROCESSING_INTERVAL_MS)) => {
if let Some(_session) = self.active_sessions.get_mut(active_session) {
}
}
}
}
Ok(())
}
pub async fn shutdown(&mut self) -> Result<()> {
let _ = self.cancel_tx.send(true);
let shutdown_futures: Vec<_> = self
.active_sessions
.iter_mut()
.map(|session| session.shutdown())
.collect();
const PTY_SHUTDOWN_TIMEOUT_SECS: u64 = 5;
let shutdown_timeout = Duration::from_secs(PTY_SHUTDOWN_TIMEOUT_SECS);
tokio::select! {
results = futures::future::try_join_all(shutdown_futures) => {
match results {
Ok(_) => tracing::debug!("All PTY sessions shutdown successfully"),
Err(e) => tracing::warn!("Some PTY sessions failed to shutdown cleanly: {e}"),
}
}
_ = tokio::time::sleep(shutdown_timeout) => {
tracing::warn!("PTY session shutdown timed out after {} seconds", shutdown_timeout.as_secs());
}
}
self.active_sessions.clear();
Ok(())
}
}
impl Default for PtyManager {
fn default() -> Self {
Self::new()
}
}
pub mod utils {
use super::*;
pub fn should_allocate_pty(config: &PtyConfig) -> Result<bool> {
if config.disable_pty {
return Ok(false);
}
if config.force_pty {
return Ok(true);
}
Ok(atty::is(atty::Stream::Stdin) && atty::is(atty::Stream::Stdout))
}
pub fn get_terminal_size() -> Result<(u32, u32)> {
if let Some((Width(w), Height(h))) = terminal_size() {
Ok((u32::from(w), u32::from(h)))
} else {
Ok((80, 24))
}
}
pub fn setup_resize_handler() -> Result<Signals> {
let signals = Signals::new([SIGWINCH])
.with_context(|| "Failed to register SIGWINCH signal handler")?;
Ok(signals)
}
pub fn has_controlling_terminal() -> bool {
atty::is(atty::Stream::Stdin) && atty::is(atty::Stream::Stdout)
}
}
pub use utils::*;