use std::fmt;
use std::io::{Read, Write};
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use std::time::Instant;
use bytes::Bytes;
use tastty_core::frame::{bracketed_paste, focus_report};
use tastty_core::{
KeyEncoder, KeyEvent, MouseEncoder, MouseEvent, Parser, Screen, ScreenEvent, TerminalMode,
TerminalSize,
};
use tokio::sync::mpsc;
use tracing::{debug, debug_span, warn};
use crate::error::{Error, IoError, Result};
use crate::process_group;
mod io;
mod managed;
mod options;
mod unmanaged;
pub use io::WriterHandle;
pub use options::{KeyAction, SessionOptions};
use io::{JOIN_TIMEOUT, ReaderContext, join_with_timeout, spawn_reader, spawn_writer};
use managed::{ManagedState, SHUTDOWN_TIMEOUT, poll_exit, wait_for_exit_or_deadline};
use options::KeyCallback;
mod sealed {
pub trait Sealed {}
}
pub trait TerminalOwnership: sealed::Sealed {}
pub struct Managed;
pub struct Unmanaged;
impl sealed::Sealed for Managed {}
impl sealed::Sealed for Unmanaged {}
impl TerminalOwnership for Managed {}
impl TerminalOwnership for Unmanaged {}
pub type ManagedTerminal = Terminal<Managed>;
pub type UnmanagedTerminal = Terminal<Unmanaged>;
pub struct Terminal<Mode: TerminalOwnership = Managed> {
pub(super) parser: Arc<RwLock<Parser>>,
pub(super) writer_tx: Option<mpsc::Sender<Bytes>>,
pub(super) managed: Option<ManagedState>,
pub(super) reader_handle: Option<std::thread::JoinHandle<()>>,
pub(super) writer_handle: Option<std::thread::JoinHandle<()>>,
pub(super) dirty: Arc<AtomicBool>,
pub(super) redraw_notify: Arc<tokio::sync::Notify>,
pub(super) event_rx: std::sync::Mutex<Option<mpsc::UnboundedReceiver<ScreenEvent>>>,
pub(super) io_error_rx: std::sync::Mutex<Option<mpsc::UnboundedReceiver<IoError>>>,
pub(super) last_sync_arrival_ms: Arc<AtomicU64>,
pub(super) epoch: Instant,
pub(super) virtual_cols: Option<u16>,
pub(super) key_callback: Option<KeyCallback>,
pub(super) _mode: std::marker::PhantomData<Mode>,
}
pub(super) struct CommonParts {
pub(super) parser: Arc<RwLock<Parser>>,
pub(super) writer_tx: mpsc::Sender<Bytes>,
pub(super) reader_handle: std::thread::JoinHandle<()>,
pub(super) writer_handle: std::thread::JoinHandle<()>,
pub(super) dirty: Arc<AtomicBool>,
pub(super) redraw_notify: Arc<tokio::sync::Notify>,
pub(super) event_rx: mpsc::UnboundedReceiver<ScreenEvent>,
pub(super) io_error_rx: mpsc::UnboundedReceiver<IoError>,
pub(super) last_sync_arrival_ms: Arc<AtomicU64>,
pub(super) epoch: Instant,
pub(super) virtual_cols: Option<u16>,
pub(super) key_callback: Option<KeyCallback>,
}
pub(super) fn build_common(
reader: Box<dyn Read + Send>,
writer: Box<dyn Write + Send>,
opts: &mut SessionOptions,
reader_done: Option<std::sync::mpsc::Sender<()>>,
) -> Result<CommonParts> {
let pty_size = opts.pty_size();
let parser_cols = opts.virtual_cols.unwrap_or(pty_size.cols);
let span = debug_span!(
"terminal_session_build",
rows = pty_size.rows,
cols = pty_size.cols,
parser_cols,
scrollback = opts.scrollback,
has_output_callback = opts.output_callback.is_some(),
has_input_callback = opts.input_callback.is_some(),
has_redraw_callback = opts.redraw_callback.is_some(),
);
let _guard = span.enter();
debug!("building terminal session internals");
let parser_size = TerminalSize {
rows: pty_size.rows,
cols: parser_cols,
};
let host_profile = Arc::new(opts.host_profile.take().unwrap_or_default());
let mut p = Parser::with_profile(
parser_size,
opts.scrollback as usize,
Arc::clone(&host_profile),
);
if opts.pixel_cell_size.width > 0 {
p.screen_mut().set_pixel_cell_size(opts.pixel_cell_size);
}
let parser = Arc::new(RwLock::new(p));
debug!("parser initialized");
let (io_error_tx, io_error_rx) = mpsc::unbounded_channel::<IoError>();
let (writer_tx, writer_handle) =
spawn_writer(writer, opts.input_callback.take(), io_error_tx.clone())?;
debug!("writer thread spawned");
let dirty = Arc::new(AtomicBool::new(false));
let redraw_notify = Arc::new(tokio::sync::Notify::new());
let epoch = Instant::now();
let last_sync_arrival_ms = Arc::new(AtomicU64::new(0));
let (event_tx, event_rx) = mpsc::unbounded_channel::<ScreenEvent>();
let reader_handle = match spawn_reader(ReaderContext {
reader,
parser: Arc::clone(&parser),
host_profile,
response_tx: writer_tx.clone(),
dirty: Arc::clone(&dirty),
redraw_notify: Arc::clone(&redraw_notify),
last_sync_arrival_ms: Arc::clone(&last_sync_arrival_ms),
epoch,
event_tx,
io_error_tx,
output_callback: opts.output_callback.take(),
redraw_callback: opts.redraw_callback.take(),
host_query_callback: Arc::clone(&opts.host_query_callback),
clipboard_policy: opts.clipboard_policy,
reader_done,
}) {
Ok(handle) => handle,
Err(err) => {
drop(writer_tx);
join_with_timeout(Some(writer_handle), JOIN_TIMEOUT);
return Err(err);
}
};
debug!("reader thread spawned");
Ok(CommonParts {
parser,
writer_tx,
reader_handle,
writer_handle,
dirty,
redraw_notify,
event_rx,
io_error_rx,
last_sync_arrival_ms,
epoch,
virtual_cols: opts.virtual_cols,
key_callback: opts.key_callback.take(),
})
}
pub(super) fn spawn_named<F>(name: &'static str, f: F) -> Result<JoinHandle<()>>
where
F: FnOnce() + Send + 'static,
{
#[cfg(test)]
if consume_thread_spawn_failure(name) {
return Err(Error::ThreadSpawn {
name,
source: std::io::Error::other("injected thread spawn failure"),
});
}
std::thread::Builder::new()
.name(name.into())
.spawn(f)
.map_err(|source| Error::ThreadSpawn { name, source })
}
#[cfg(test)]
thread_local! {
static THREAD_SPAWN_FAILURES: std::cell::RefCell<Vec<&'static str>> =
const { std::cell::RefCell::new(Vec::new()) };
}
#[cfg(test)]
pub(super) fn fail_next_thread_spawn(name: &'static str) {
THREAD_SPAWN_FAILURES.with(|failures| failures.borrow_mut().push(name));
}
#[cfg(test)]
fn consume_thread_spawn_failure(name: &'static str) -> bool {
THREAD_SPAWN_FAILURES.with(|failures| {
let mut failures = failures.borrow_mut();
let Some(pos) = failures.iter().position(|candidate| *candidate == name) else {
return false;
};
failures.remove(pos);
true
})
}
impl<M: TerminalOwnership> Terminal<M> {
pub(super) fn parser_read(&self) -> std::sync::RwLockReadGuard<'_, Parser> {
self.parser
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
pub(super) fn parser_write(&self) -> std::sync::RwLockWriteGuard<'_, Parser> {
self.parser
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
pub fn reply(&self, reply: crate::host_reply::HostReply) -> Result<()> {
self.send(&reply.encode())
}
pub async fn reply_async(&self, reply: crate::host_reply::HostReply) -> Result<()> {
self.send_async(&reply.encode()).await
}
pub fn send(&self, bytes: &[u8]) -> Result<()> {
let tx = self.writer_tx.as_ref().ok_or(Error::SendClosed)?;
match tx.try_send(Bytes::copy_from_slice(bytes)) {
Ok(()) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => Err(Error::SendQueueFull),
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => Err(Error::SendClosed),
}
}
pub fn send_key(&self, key: impl Into<KeyEvent>) -> Result<()> {
if let Some(bytes) = self.encode_key(key.into())? {
self.send(&bytes)?;
}
Ok(())
}
fn send_blocking(&self, bytes: &[u8]) -> Result<()> {
if tokio::runtime::Handle::try_current().is_ok() {
return Err(Error::BlockingInsideAsync);
}
let tx = self.writer_tx.as_ref().ok_or(Error::SendClosed)?;
tx.blocking_send(Bytes::copy_from_slice(bytes))
.map_err(|_err| Error::SendClosed)
}
pub fn paste_frame(&self, text: &str) -> Vec<u8> {
let bracketed = self
.parser_read()
.screen()
.mode(TerminalMode::BracketedPaste);
bracketed_paste(text, bracketed)
}
pub fn focus_frame(&self, gained: bool) -> Option<&'static [u8]> {
if self.parser_read().screen().mode(TerminalMode::FocusInOut) {
Some(focus_report(gained))
} else {
None
}
}
#[must_use]
pub fn writer(&self) -> Option<WriterHandle> {
self.writer_tx
.as_ref()
.map(|tx| WriterHandle::new(tx.clone()))
}
pub fn send_paste(&self, text: &str) -> Result<()> {
self.send_blocking(&self.paste_frame(text))
}
pub fn send_focus(&self, gained: bool) -> Result<()> {
if let Some(bytes) = self.focus_frame(gained) {
self.send_blocking(bytes)?;
}
Ok(())
}
pub fn send_mouse(&self, event: impl Into<MouseEvent>) -> Result<()> {
let event = event.into();
let enc = self.mouse_encoder();
if let Some(bytes) = enc.encode_mouse(&event) {
self.send(&bytes)?;
}
Ok(())
}
pub async fn send_async(&self, bytes: &[u8]) -> Result<()> {
let tx = self.writer_tx.as_ref().ok_or(Error::SendClosed)?;
tx.send(Bytes::copy_from_slice(bytes))
.await
.map_err(|_err| Error::SendClosed)
}
pub async fn send_key_async(&self, key: impl Into<KeyEvent>) -> Result<()> {
let encoded = self.encode_key(key.into())?;
if let Some(bytes) = encoded {
self.send_async(&bytes).await?;
}
Ok(())
}
pub async fn send_paste_async(&self, text: &str) -> Result<()> {
let frame = self.paste_frame(text);
self.send_async(&frame).await
}
pub async fn send_focus_async(&self, gained: bool) -> Result<()> {
if let Some(bytes) = self.focus_frame(gained) {
self.send_async(bytes).await?;
}
Ok(())
}
pub fn key_encoder(&self) -> KeyEncoder {
let mut enc = KeyEncoder::new();
let parser = self.parser_read();
enc.sync(parser.screen());
enc
}
pub fn mouse_encoder(&self) -> MouseEncoder {
let mut enc = MouseEncoder::new();
let parser = self.parser_read();
enc.sync(parser.screen());
enc
}
fn encode_key(&self, key: KeyEvent) -> Result<Option<Vec<u8>>> {
let enc = self.key_encoder();
if let Some(ref cb) = self.key_callback {
match cb(&key, *enc.screen_state()) {
KeyAction::Drop => return Ok(None),
KeyAction::Replace(bytes) => return Ok(Some(bytes)),
KeyAction::Send => {}
}
}
Ok(enc.encode_key(&key))
}
pub fn with_screen<F, R>(&self, f: F) -> R
where
F: FnOnce(&Screen) -> R,
{
let parser = self.parser_read();
f(parser.screen())
}
#[must_use]
pub fn size(&self) -> TerminalSize {
self.with_screen(Screen::size)
}
pub fn scroll_up(&self, count: usize) {
self.parser_write().screen_mut().scroll_up(count);
}
pub fn scroll_down(&self, count: usize) {
self.parser_write().screen_mut().scroll_down(count);
}
pub fn scroll_reset(&self) {
self.parser_write().screen_mut().scroll_reset();
}
pub fn scroll_to(&self, offset: usize) {
self.parser_write().screen_mut().scroll_to(offset);
}
#[must_use]
pub fn scrollback_offset(&self) -> usize {
self.parser_read().screen().scrollback()
}
#[must_use]
pub fn scrollback_available(&self) -> usize {
self.parser_read().screen().scrollback_available()
}
#[must_use]
pub fn any_mouse_mode(&self) -> bool {
let parser = self.parser_read();
let s = parser.screen();
s.mode(TerminalMode::MouseReportClick)
|| s.mode(TerminalMode::MouseReportCellMotion)
|| s.mode(TerminalMode::MouseReportAllMotion)
}
#[must_use]
pub fn kitty_keyboard_flags(&self) -> u8 {
self.parser_read().screen().kitty_keyboard_flags()
}
}
impl<M: TerminalOwnership> fmt::Debug for Terminal<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut s = f.debug_struct("Terminal");
if let Some(ref managed) = self.managed {
s.field("child_pid", &managed.child_pid);
s.field("finished", &poll_exit(managed).is_some());
}
s.finish_non_exhaustive()
}
}
impl<M: TerminalOwnership> Drop for Terminal<M> {
fn drop(&mut self) {
if let Some(ref managed) = self.managed {
let already_dead = poll_exit(managed).is_some();
if !already_dead {
if let Some(pid) = managed.child_pid {
let mut killer = managed.killer.lock().unwrap_or_else(|e| e.into_inner());
if let Err(err) = process_group::sigterm_group(pid, killer.as_mut()) {
warn!(
child_pid = pid,
error = %err,
"failed to send graceful termination to child on drop",
);
}
}
self.writer_tx.take();
wait_for_exit_or_deadline(managed, SHUTDOWN_TIMEOUT);
if poll_exit(managed).is_none()
&& let Some(pid) = managed.child_pid
{
let mut killer = managed.killer.lock().unwrap_or_else(|e| e.into_inner());
if let Err(err) = process_group::sigkill_group(pid, killer.as_mut()) {
warn!(
child_pid = pid,
error = %err,
"failed to force-kill child on drop",
);
}
}
} else {
self.writer_tx.take();
}
} else {
self.writer_tx.take();
}
join_with_timeout(self.reader_handle.take(), JOIN_TIMEOUT);
join_with_timeout(self.writer_handle.take(), JOIN_TIMEOUT);
}
}
const _: () = {
fn _assert_send<T: Send>() {}
fn _assert() {
_assert_send::<Terminal<Managed>>();
_assert_send::<Terminal<Unmanaged>>();
}
};
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use std::time::Duration;
#[test]
fn parser_lock_recovers_from_poison() {
let session: Terminal<Unmanaged> = Terminal::from_reader_writer(
Box::new(Cursor::new(Vec::<u8>::new())),
Box::new(std::io::sink()),
SessionOptions::default(),
)
.expect("offline session build");
let parser = Arc::clone(&session.parser);
drop(
std::thread::spawn(move || {
let _guard = parser.write().unwrap();
panic!("intentional poison");
})
.join(),
);
assert!(
session.parser.is_poisoned(),
"writer panic must poison lock"
);
let _: () = session.with_screen(|_| ());
session.scroll_reset();
assert!(
session.parser.is_poisoned(),
"lock stays poisoned; recovery does not clear the flag",
);
}
#[test]
fn from_reader_writer_returns_thread_spawn_errors() {
fail_next_thread_spawn(io::WRITER_THREAD_NAME);
let Err(err) = Terminal::<Unmanaged>::from_reader_writer(
Box::new(Cursor::new(Vec::<u8>::new())),
Box::new(std::io::sink()),
SessionOptions::default(),
) else {
panic!("offline session build unexpectedly succeeded");
};
assert!(
matches!(
err,
Error::ThreadSpawn {
name: io::WRITER_THREAD_NAME,
..
}
),
"expected writer thread spawn failure, got {err:?}",
);
}
#[test]
fn send_blocking_rejects_inside_tokio_runtime() {
let session: Terminal<Unmanaged> = Terminal::from_reader_writer(
Box::new(Cursor::new(Vec::<u8>::new())),
Box::new(std::io::sink()),
SessionOptions::default(),
)
.expect("offline session build");
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let err = rt.block_on(async { session.send_paste("x") }).unwrap_err();
assert!(
matches!(err, Error::BlockingInsideAsync),
"expected BlockingInsideAsync, got {err:?}",
);
}
struct FaultyReader {
sent_chunk: bool,
}
impl std::io::Read for FaultyReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if !self.sent_chunk {
self.sent_chunk = true;
let payload = b"hi";
let n = payload.len().min(buf.len());
buf[..n].copy_from_slice(&payload[..n]);
return Ok(n);
}
Err(std::io::Error::from(std::io::ErrorKind::PermissionDenied))
}
}
#[test]
fn reader_thread_surfaces_non_retryable_io_errors() {
let session: Terminal<Unmanaged> = Terminal::from_reader_writer(
Box::new(FaultyReader { sent_chunk: false }),
Box::new(std::io::sink()),
SessionOptions::default(),
)
.expect("offline session build");
let mut rx = session
.take_io_error_receiver()
.expect("I/O error receiver");
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();
let received = rt
.block_on(async { tokio::time::timeout(Duration::from_secs(1), rx.recv()).await })
.expect("reader error did not arrive in time")
.expect("I/O error channel closed without sending");
let crate::IoError::Reader(received) = received else {
panic!("expected reader I/O error, got {received:?}");
};
assert_eq!(
received.kind,
std::io::ErrorKind::PermissionDenied,
"expected the originating io::ErrorKind to be exposed verbatim",
);
assert_eq!(received.source.kind(), std::io::ErrorKind::PermissionDenied);
let chained: &dyn std::error::Error = &received;
assert!(chained.source().is_some(), "source chain must be reachable");
}
#[test]
fn reader_thread_keeps_eof_silent() {
let session: Terminal<Unmanaged> = Terminal::from_reader_writer(
Box::new(Cursor::new(Vec::<u8>::new())),
Box::new(std::io::sink()),
SessionOptions::default(),
)
.expect("offline session build");
std::thread::sleep(Duration::from_millis(50));
assert!(
session.try_recv_io_error().is_none(),
"EOF must not surface as a reader error",
);
}
}