use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use portable_pty::{ChildKiller, MasterPty, PtySize, native_pty_system};
use tastty_core::{TerminalMode, TerminalSize};
use tokio::sync::Notify;
use tracing::{debug, info, info_span};
use crate::Builder;
use crate::error::{Error, Result};
use crate::exit_status::ExitStatus;
use crate::process_group;
use super::Managed;
use super::Terminal;
use super::build_common;
use super::spawn_named;
const CHILD_WAITER_THREAD_NAME: &str = "tastty-child-waiter";
pub(super) const SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(100);
pub(crate) struct ManagedState {
pub(super) pty_master: Mutex<Box<dyn MasterPty + Send>>,
pub(super) child_pid: Option<u32>,
pub(super) killer: std::sync::Mutex<Box<dyn ChildKiller + Send + Sync>>,
pub(super) exit_chan: Arc<ExitChannel>,
}
pub(super) struct ExitChannel {
state: Mutex<ExitState>,
sync_cvar: Condvar,
async_notify: Notify,
}
#[derive(Clone)]
pub(super) enum ExitState {
Pending,
Exited(ExitStatus),
Unavailable,
}
impl ExitChannel {
fn new() -> Self {
Self {
state: Mutex::new(ExitState::Pending),
sync_cvar: Condvar::new(),
async_notify: Notify::new(),
}
}
fn publish(&self, new: ExitState) {
{
let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
*g = new;
}
self.sync_cvar.notify_all();
self.async_notify.notify_waiters();
}
fn publish_if_pending(&self, new: ExitState) {
let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
if matches!(*g, ExitState::Pending) {
*g = new;
drop(g);
self.sync_cvar.notify_all();
self.async_notify.notify_waiters();
}
}
fn snapshot(&self) -> ExitState {
self.state.lock().unwrap_or_else(|e| e.into_inner()).clone()
}
}
impl Terminal<Managed> {
pub fn spawn(builder: Builder) -> Result<Self> {
let (cmd, mut opts) = builder.into_parts()?;
let pty_size = opts.pty_size();
let span = info_span!(
"terminal_session_spawn",
rows = pty_size.rows,
cols = pty_size.cols,
pixel_width = pty_size.pixel_width,
pixel_height = pty_size.pixel_height,
);
let _guard = span.enter();
info!("tastty spawn begin");
let pty_system = native_pty_system();
debug!("calling native_pty_system.openpty");
let pair = pty_system
.openpty(pty_size)
.map_err(|source| Error::OpenPtyFailed {
source: source.into(),
})?;
debug!("openpty succeeded");
#[cfg(unix)]
if !opts.echo {
debug!("disabling PTY echo");
disable_echo(&*pair.master)?;
debug!("PTY echo disabled");
}
debug!("calling slave.spawn_command");
let mut child =
pair.slave
.spawn_command(cmd)
.map_err(|source| Error::SpawnCommandFailed {
source: source.into(),
})?;
debug!("spawn_command succeeded");
let child_pid = child.process_id();
info!(child_pid = child_pid.unwrap_or(0), "child process spawned");
let killer = child.clone_killer();
let exit_chan = Arc::new(ExitChannel::new());
let exit_chan_for_waiter = Arc::clone(&exit_chan);
let (reader_done_tx, reader_done_rx) = std::sync::mpsc::channel::<()>();
let _waiter = spawn_named(CHILD_WAITER_THREAD_NAME, move || {
struct PublishOnDrop(Arc<ExitChannel>);
impl Drop for PublishOnDrop {
fn drop(&mut self) {
self.0.publish_if_pending(ExitState::Unavailable);
}
}
let guard = PublishOnDrop(exit_chan_for_waiter);
if let Ok(status) = child.wait() {
let _done = reader_done_rx.recv();
guard
.0
.publish(ExitState::Exited(ExitStatus::from_portable(status)));
}
})?;
debug!("cloning PTY reader");
let reader = pair
.master
.try_clone_reader()
.map_err(|source| Error::CloneReaderFailed {
source: source.into(),
})?;
debug!("PTY reader clone succeeded");
debug!("taking PTY writer");
let writer = pair
.master
.take_writer()
.map_err(|source| Error::TakeWriterFailed {
source: source.into(),
})?;
debug!("PTY writer acquired");
debug!("building terminal session");
let common = build_common(reader, writer, &mut opts, Some(reader_done_tx))?;
info!(child_pid = child_pid.unwrap_or(0), "tastty spawn complete");
Ok(Self {
parser: common.parser,
writer_tx: Some(common.writer_tx),
managed: Some(ManagedState {
pty_master: Mutex::new(pair.master),
child_pid,
killer: std::sync::Mutex::new(killer),
exit_chan,
}),
reader_handle: Some(common.reader_handle),
writer_handle: Some(common.writer_handle),
dirty: common.dirty,
redraw_notify: common.redraw_notify,
event_rx: std::sync::Mutex::new(Some(common.event_rx)),
io_error_rx: std::sync::Mutex::new(Some(common.io_error_rx)),
last_sync_arrival_ms: common.last_sync_arrival_ms,
epoch: common.epoch,
virtual_cols: common.virtual_cols,
key_callback: common.key_callback,
_mode: std::marker::PhantomData,
})
}
fn managed_state(&self) -> &ManagedState {
self.managed
.as_ref()
.expect("Terminal<Managed> always carries managed state")
}
pub fn resize(&self, size: TerminalSize) -> Result<()> {
let TerminalSize { rows, cols } = size;
if cols == 0 || rows == 0 {
return Err(Error::InvalidResize { rows, cols });
}
let (pixel_cell, in_band) = {
let parser = self.parser_read();
(
parser.screen().pixel_cell_size(),
parser.screen().mode(TerminalMode::InBandResize),
)
};
let pixel_width = cols.saturating_mul(pixel_cell.width);
let pixel_height = rows.saturating_mul(pixel_cell.height);
let pty_size = PtySize {
rows,
cols,
pixel_width,
pixel_height,
};
self.managed_state()
.pty_master
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.resize(pty_size)
.map_err(|source| Error::ResizeFailed {
rows,
cols,
source: source.into(),
})?;
let parser_cols = self.virtual_cols.unwrap_or(cols);
{
let mut parser = self.parser_write();
parser.screen_mut().set_size(TerminalSize {
rows,
cols: parser_cols,
});
}
if in_band {
let resp = format!("\x1b[48;{rows};{cols};{pixel_height};{pixel_width}t");
self.send(resp.as_bytes())?;
}
Ok(())
}
pub fn try_wait(&self) -> Result<Option<ExitStatus>> {
try_wait_inner(self.managed_state())
}
pub async fn wait_async(&self) -> Result<ExitStatus> {
let chan = &self.managed_state().exit_chan;
loop {
let notified = chan.async_notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
match chan.snapshot() {
ExitState::Exited(status) => return Ok(status),
ExitState::Unavailable => return Err(Error::ExitStatusUnavailable),
ExitState::Pending => {}
}
notified.await;
}
}
#[must_use]
pub fn is_finished(&self) -> bool {
!matches!(
self.managed_state().exit_chan.snapshot(),
ExitState::Pending
)
}
pub fn kill(&self) -> Result<()> {
if self.is_finished() {
return Ok(());
}
let managed = self.managed_state();
let Some(pid) = managed.child_pid else {
return Ok(());
};
{
let mut killer = managed.killer.lock().unwrap_or_else(|e| e.into_inner());
process_group::sigterm_group(pid, killer.as_mut())
.map_err(|source| Error::TerminateFailed { source })?;
}
wait_for_exit_or_deadline(managed, SHUTDOWN_TIMEOUT);
if !self.is_finished() {
let mut killer = managed.killer.lock().unwrap_or_else(|e| e.into_inner());
process_group::sigkill_group(pid, killer.as_mut())
.map_err(|source| Error::ForceKillFailed { source })?;
}
Ok(())
}
#[must_use]
pub fn process_id(&self) -> Option<u32> {
self.managed_state().child_pid
}
}
fn try_wait_inner(managed: &ManagedState) -> Result<Option<ExitStatus>> {
match managed.exit_chan.snapshot() {
ExitState::Exited(status) => Ok(Some(status)),
ExitState::Pending => Ok(None),
ExitState::Unavailable => Err(Error::ExitStatusUnavailable),
}
}
pub(super) fn poll_exit(managed: &ManagedState) -> Option<ExitStatus> {
match managed.exit_chan.snapshot() {
ExitState::Exited(status) => Some(status),
ExitState::Pending | ExitState::Unavailable => None,
}
}
pub(super) fn wait_for_exit_or_deadline(managed: &ManagedState, timeout: Duration) {
let chan = &*managed.exit_chan;
let guard = chan.state.lock().unwrap_or_else(|e| e.into_inner());
drop(
chan.sync_cvar
.wait_timeout_while(guard, timeout, |s| matches!(s, ExitState::Pending))
.unwrap_or_else(|e| e.into_inner()),
);
}
#[cfg(unix)]
fn disable_echo(master: &dyn MasterPty) -> Result<()> {
use nix::sys::termios::{LocalFlags, SetArg, tcgetattr, tcsetattr};
use std::fs::OpenOptions;
use std::os::unix::fs::OpenOptionsExt;
let Some(path) = master.tty_name() else {
return Ok(());
};
let tty = OpenOptions::new()
.read(true)
.write(true)
.custom_flags(libc::O_NOCTTY)
.open(path)
.map_err(|source| Error::DisableEchoFailed { source })?;
let mut termios = tcgetattr(&tty).map_err(|e| Error::DisableEchoFailed { source: e.into() })?;
termios.local_flags &= !LocalFlags::ECHO;
tcsetattr(&tty, SetArg::TCSANOW, &termios)
.map_err(|e| Error::DisableEchoFailed { source: e.into() })?;
Ok(())
}