#![forbid(unsafe_code)]
use std::cell::Cell;
use std::env;
use std::io::{self, Write};
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use crate::event::Event;
use crate::terminal_capabilities::TerminalCapabilities;
#[cfg(feature = "tracing")]
use crate::logging::{info_span, warn};
#[cfg(not(feature = "tracing"))]
use crate::{info_span, warn};
static IO_READ_DURATION_SUM_US: AtomicU64 = AtomicU64::new(0);
static IO_READ_COUNT: AtomicU64 = AtomicU64::new(0);
static IO_WRITE_DURATION_SUM_US: AtomicU64 = AtomicU64::new(0);
static IO_WRITE_COUNT: AtomicU64 = AtomicU64::new(0);
static IO_FLUSH_DURATION_SUM_US: AtomicU64 = AtomicU64::new(0);
static IO_FLUSH_COUNT: AtomicU64 = AtomicU64::new(0);
thread_local! {
static PANIC_CLEANUP_SUPPRESS_DEPTH: Cell<u32> = const { Cell::new(0) };
}
const SIGNAL_SHUTDOWN_GRACE: Duration = Duration::from_secs(2);
const SIGNAL_SHUTDOWN_POLL: Duration = Duration::from_millis(10);
pub fn terminal_io_read_stats() -> (u64, u64) {
(
IO_READ_DURATION_SUM_US.load(Ordering::Relaxed),
IO_READ_COUNT.load(Ordering::Relaxed),
)
}
pub fn terminal_io_write_stats() -> (u64, u64) {
(
IO_WRITE_DURATION_SUM_US.load(Ordering::Relaxed),
IO_WRITE_COUNT.load(Ordering::Relaxed),
)
}
pub fn terminal_io_flush_stats() -> (u64, u64) {
(
IO_FLUSH_DURATION_SUM_US.load(Ordering::Relaxed),
IO_FLUSH_COUNT.load(Ordering::Relaxed),
)
}
fn wait_for_shutdown_ack() -> bool {
let deadline = std::time::Instant::now()
.checked_add(SIGNAL_SHUTDOWN_GRACE)
.unwrap_or_else(std::time::Instant::now);
loop {
if crate::shutdown_signal::pending_termination_signal().is_none() {
return true;
}
if std::time::Instant::now() >= deadline {
return false;
}
std::thread::sleep(SIGNAL_SHUTDOWN_POLL);
}
}
pub fn with_panic_cleanup_suppressed<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
#[cfg(panic = "abort")]
{
return f();
}
#[cfg(not(panic = "abort"))]
{
struct SuppressGuard;
impl Drop for SuppressGuard {
fn drop(&mut self) {
PANIC_CLEANUP_SUPPRESS_DEPTH.with(|depth| {
depth.set(depth.get().saturating_sub(1));
});
}
}
PANIC_CLEANUP_SUPPRESS_DEPTH.with(|depth| {
depth.set(depth.get().saturating_add(1));
});
let _guard = SuppressGuard;
f()
}
}
fn panic_cleanup_suppressed() -> bool {
PANIC_CLEANUP_SUPPRESS_DEPTH.with(|depth| depth.get() > 0)
}
fn to_std_duration(d: web_time::Duration) -> Duration {
Duration::from_micros(d.as_micros().min(u64::MAX as u128) as u64)
}
const SIZE_RETRY_DELAY: Duration = Duration::from_millis(10);
#[inline]
fn size_retry_delay(cx: &crate::cx::Cx) -> Option<Duration> {
if cx.is_done() {
return None;
}
match cx.remaining() {
Some(remaining) if to_std_duration(remaining) <= SIZE_RETRY_DELAY => None,
_ => Some(SIZE_RETRY_DELAY),
}
}
#[cfg_attr(not(feature = "tracing"), allow(dead_code))]
fn cx_deadline_remaining_us(cx: &crate::cx::Cx) -> u64 {
cx.remaining()
.map(|r| r.as_micros().min(u64::MAX as u128) as u64)
.unwrap_or(u64::MAX)
}
const KITTY_KEYBOARD_ENABLE: &[u8] = b"\x1b[>15u";
const KITTY_KEYBOARD_DISABLE: &[u8] = b"\x1b[<u";
const RESET_SCROLL_REGION: &[u8] = b"\x1b[r";
const RESET_STYLE: &[u8] = b"\x1b[0m";
const SYNC_END: &[u8] = b"\x1b[?2026l";
const MOUSE_ENABLE_SEQ: &[u8] = b"\x1b[?1001l\x1b[?1003l\x1b[?1005l\x1b[?1015l\x1b[?1016l\x1b[?1006;1000;1002h\x1b[?1006h\x1b[?1000h\x1b[?1002h";
const MOUSE_ENABLE_MUX_SAFE_SEQ: &[u8] =
b"\x1b[?1001l\x1b[?1003l\x1b[?1005l\x1b[?1015l\x1b[?1016l\x1b[?1006h\x1b[?1000h\x1b[?1002h";
const MOUSE_DISABLE_SEQ: &[u8] = b"\x1b[?1000;1002;1006l\x1b[?1000l\x1b[?1002l\x1b[?1006l\x1b[?1001l\x1b[?1003l\x1b[?1005l\x1b[?1015l\x1b[?1016l";
const MOUSE_DISABLE_MUX_SAFE_SEQ: &[u8] =
b"\x1b[?1016l\x1b[?1000l\x1b[?1002l\x1b[?1003l\x1b[?1006l\x1b[?1001l\x1b[?1005l\x1b[?1015l";
static TERMINAL_SESSION_ACTIVE: AtomicBool = AtomicBool::new(false);
#[derive(Debug)]
struct SessionLock;
impl SessionLock {
fn acquire() -> io::Result<Self> {
if TERMINAL_SESSION_ACTIVE
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return Err(io::Error::other("TerminalSession already active"));
}
Ok(Self)
}
}
impl Drop for SessionLock {
fn drop(&mut self) {
TERMINAL_SESSION_ACTIVE.store(false, Ordering::SeqCst);
}
}
#[cfg(unix)]
use signal_hook::consts::signal::{SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGWINCH};
#[cfg(unix)]
use signal_hook::iterator::Signals;
#[derive(Debug, Clone)]
pub struct SessionOptions {
pub alternate_screen: bool,
pub mouse_capture: bool,
pub bracketed_paste: bool,
pub focus_events: bool,
pub kitty_keyboard: bool,
pub intercept_signals: bool,
}
impl Default for SessionOptions {
fn default() -> Self {
Self {
alternate_screen: false,
mouse_capture: false,
bracketed_paste: false,
focus_events: false,
kitty_keyboard: false,
intercept_signals: true,
}
}
}
#[inline]
fn sanitize_session_options(
mut requested: SessionOptions,
capabilities: &TerminalCapabilities,
) -> SessionOptions {
let focus_events_supported = capabilities.focus_events && !capabilities.in_any_mux();
let kitty_keyboard_supported = capabilities.kitty_keyboard && !capabilities.in_any_mux();
requested.mouse_capture = requested.mouse_capture && capabilities.mouse_sgr;
requested.bracketed_paste = requested.bracketed_paste && capabilities.bracketed_paste;
requested.focus_events = requested.focus_events && focus_events_supported;
requested.kitty_keyboard = requested.kitty_keyboard && kitty_keyboard_supported;
requested
}
#[derive(Debug)]
pub struct TerminalSession {
session_lock: Option<SessionLock>,
headless: bool,
options: SessionOptions,
alternate_screen_enabled: bool,
mouse_enabled: bool,
bracketed_paste_enabled: bool,
focus_events_enabled: bool,
kitty_keyboard_enabled: bool,
#[cfg(unix)]
signal_guard: Option<SignalGuard>,
}
impl TerminalSession {
#[inline]
fn mouse_enable_sequence_for_caps(caps: &TerminalCapabilities) -> &'static [u8] {
if caps.in_any_mux() {
MOUSE_ENABLE_MUX_SAFE_SEQ
} else {
MOUSE_ENABLE_SEQ
}
}
#[inline]
fn mouse_disable_sequence_for_caps(caps: &TerminalCapabilities) -> &'static [u8] {
if caps.in_any_mux() {
MOUSE_DISABLE_MUX_SAFE_SEQ
} else {
MOUSE_DISABLE_SEQ
}
}
pub fn new(options: SessionOptions) -> io::Result<Self> {
install_panic_hook();
let capabilities = TerminalCapabilities::with_overrides();
let options = sanitize_session_options(options, &capabilities);
let session_lock = SessionLock::acquire()?;
#[cfg(unix)]
let signal_guard = if options.intercept_signals {
Some(SignalGuard::new()?)
} else {
None
};
crossterm::terminal::enable_raw_mode()?;
#[cfg(feature = "tracing")]
tracing::info!("terminal raw mode enabled");
let mut session = Self {
session_lock: Some(session_lock),
headless: false,
options: options.clone(),
alternate_screen_enabled: false,
mouse_enabled: false,
bracketed_paste_enabled: false,
focus_events_enabled: false,
kitty_keyboard_enabled: false,
#[cfg(unix)]
signal_guard,
};
let mut stdout = io::stdout();
if options.alternate_screen {
session.alternate_screen_enabled = true;
crossterm::execute!(
stdout,
crossterm::terminal::EnterAlternateScreen,
crossterm::terminal::Clear(crossterm::terminal::ClearType::All),
crossterm::cursor::MoveTo(0, 0)
)?;
#[cfg(feature = "tracing")]
tracing::info!("alternate screen enabled (with clear)");
}
if options.mouse_capture {
session.mouse_enabled = true;
let enable_seq = Self::mouse_enable_sequence_for_caps(&capabilities);
stdout.write_all(enable_seq)?;
stdout.flush()?;
#[cfg(feature = "tracing")]
tracing::info!("mouse capture enabled");
}
if options.bracketed_paste {
session.bracketed_paste_enabled = true;
crossterm::execute!(stdout, crossterm::event::EnableBracketedPaste)?;
#[cfg(feature = "tracing")]
tracing::info!("bracketed paste enabled");
}
if options.focus_events {
session.focus_events_enabled = true;
crossterm::execute!(stdout, crossterm::event::EnableFocusChange)?;
#[cfg(feature = "tracing")]
tracing::info!("focus events enabled");
}
if options.kitty_keyboard {
session.kitty_keyboard_enabled = true;
Self::enable_kitty_keyboard(&mut stdout)?;
#[cfg(feature = "tracing")]
tracing::info!("kitty keyboard enabled");
}
Ok(session)
}
#[cfg(feature = "test-helpers")]
pub fn new_for_tests(options: SessionOptions) -> io::Result<Self> {
#[cfg(unix)]
let signal_guard = None;
Ok(Self {
session_lock: None,
headless: true,
options,
alternate_screen_enabled: false,
mouse_enabled: false,
bracketed_paste_enabled: false,
focus_events_enabled: false,
kitty_keyboard_enabled: false,
#[cfg(unix)]
signal_guard,
})
}
pub fn minimal() -> io::Result<Self> {
Self::new(SessionOptions::default())
}
pub fn size(&self) -> io::Result<(u16, u16)> {
let (w, h) = crossterm::terminal::size()?;
if w > 1 && h > 1 {
return Ok((w, h));
}
if let Some((env_w, env_h)) = size_from_env() {
return Ok((env_w, env_h));
}
std::thread::sleep(SIZE_RETRY_DELAY);
let (w2, h2) = crossterm::terminal::size()?;
if w2 > 1 && h2 > 1 {
return Ok((w2, h2));
}
let final_w = w2.max(2);
let final_h = h2.max(2);
Ok((final_w, final_h))
}
pub fn poll_event(&self, timeout: std::time::Duration) -> io::Result<bool> {
crossterm::event::poll(timeout)
}
pub fn poll_event_cx(
&self,
timeout: std::time::Duration,
cx: &crate::cx::Cx,
) -> io::Result<bool> {
let _span = info_span!(
"terminal.io",
op_type = "poll",
cx_deadline_remaining_us = cx_deadline_remaining_us(cx),
cx_cancelled = cx.is_cancelled()
);
let _guard = _span.enter();
if cx.is_done() {
return Ok(false);
}
let effective = match cx.remaining() {
Some(rem) => timeout.min(to_std_duration(rem)),
None => timeout,
};
let start = web_time::Instant::now();
let result = crossterm::event::poll(effective);
let elapsed_us = start.elapsed().as_micros().min(u64::MAX as u128) as u64;
IO_READ_DURATION_SUM_US.fetch_add(elapsed_us, Ordering::Relaxed);
IO_READ_COUNT.fetch_add(1, Ordering::Relaxed);
if cx.is_done() {
warn!("terminal.io poll completed after Cx deadline/cancellation");
}
result
}
pub fn read_event(&self) -> io::Result<Option<Event>> {
let event = crossterm::event::read()?;
Ok(Event::from_crossterm(event))
}
pub fn read_event_cx(&self, cx: &crate::cx::Cx) -> io::Result<Option<Event>> {
let _span = info_span!(
"terminal.io",
op_type = "read",
cx_deadline_remaining_us = cx_deadline_remaining_us(cx),
cx_cancelled = cx.is_cancelled()
);
let _guard = _span.enter();
if cx.is_done() {
return Ok(None);
}
let remaining = cx.remaining().unwrap_or(web_time::Duration::from_secs(60));
let timeout = to_std_duration(remaining);
let start = web_time::Instant::now();
let result = if crossterm::event::poll(timeout)? {
let event = crossterm::event::read()?;
Ok(Event::from_crossterm(event))
} else {
Ok(None)
};
let elapsed_us = start.elapsed().as_micros().min(u64::MAX as u128) as u64;
IO_READ_DURATION_SUM_US.fetch_add(elapsed_us, Ordering::Relaxed);
IO_READ_COUNT.fetch_add(1, Ordering::Relaxed);
if cx.is_done() {
warn!("terminal.io read completed after Cx deadline/cancellation");
}
result
}
pub fn show_cursor(&self) -> io::Result<()> {
crossterm::execute!(io::stdout(), crossterm::cursor::Show)
}
pub fn show_cursor_cx(&self, cx: &crate::cx::Cx) -> io::Result<()> {
let _span = info_span!(
"terminal.io",
op_type = "write",
cx_deadline_remaining_us = cx_deadline_remaining_us(cx),
cx_cancelled = cx.is_cancelled()
);
let _guard = _span.enter();
if cx.is_done() {
return Ok(());
}
let start = web_time::Instant::now();
let result = crossterm::execute!(io::stdout(), crossterm::cursor::Show);
let elapsed_us = start.elapsed().as_micros().min(u64::MAX as u128) as u64;
IO_WRITE_DURATION_SUM_US.fetch_add(elapsed_us, Ordering::Relaxed);
IO_WRITE_COUNT.fetch_add(1, Ordering::Relaxed);
if cx.is_done() {
warn!("terminal.io show_cursor completed after Cx deadline/cancellation");
}
result
}
pub fn hide_cursor(&self) -> io::Result<()> {
crossterm::execute!(io::stdout(), crossterm::cursor::Hide)
}
pub fn hide_cursor_cx(&self, cx: &crate::cx::Cx) -> io::Result<()> {
let _span = info_span!(
"terminal.io",
op_type = "write",
cx_deadline_remaining_us = cx_deadline_remaining_us(cx),
cx_cancelled = cx.is_cancelled()
);
let _guard = _span.enter();
if cx.is_done() {
return Ok(());
}
let start = web_time::Instant::now();
let result = crossterm::execute!(io::stdout(), crossterm::cursor::Hide);
let elapsed_us = start.elapsed().as_micros().min(u64::MAX as u128) as u64;
IO_WRITE_DURATION_SUM_US.fetch_add(elapsed_us, Ordering::Relaxed);
IO_WRITE_COUNT.fetch_add(1, Ordering::Relaxed);
if cx.is_done() {
warn!("terminal.io hide_cursor completed after Cx deadline/cancellation");
}
result
}
#[must_use]
pub fn mouse_capture_enabled(&self) -> bool {
self.mouse_enabled
}
pub fn set_mouse_capture(&mut self, enabled: bool) -> io::Result<()> {
let caps = TerminalCapabilities::with_overrides();
let mouse_supported = caps.mouse_sgr;
let enabled = enabled && mouse_supported;
if enabled == self.mouse_enabled {
self.options.mouse_capture = enabled;
return Ok(());
}
let mut stdout = io::stdout();
let write_result = if enabled {
let enable_seq = Self::mouse_enable_sequence_for_caps(&caps);
stdout.write_all(enable_seq).and_then(|_| stdout.flush())
} else {
let disable_seq = Self::mouse_disable_sequence_for_caps(&caps);
stdout.write_all(disable_seq).and_then(|_| stdout.flush())
};
if let Err(err) = write_result {
self.mouse_enabled = self.mouse_enabled || enabled;
self.options.mouse_capture = self.mouse_enabled;
return Err(err);
}
if enabled {
self.mouse_enabled = true;
self.options.mouse_capture = true;
#[cfg(feature = "tracing")]
tracing::info!("mouse capture enabled (runtime toggle)");
} else {
self.mouse_enabled = false;
self.options.mouse_capture = false;
#[cfg(feature = "tracing")]
tracing::info!("mouse capture disabled (runtime toggle)");
}
Ok(())
}
pub fn set_mouse_capture_cx(&mut self, enabled: bool, cx: &crate::cx::Cx) -> io::Result<()> {
let _span = info_span!(
"terminal.io",
op_type = "write",
cx_deadline_remaining_us = cx_deadline_remaining_us(cx),
cx_cancelled = cx.is_cancelled()
);
let _guard = _span.enter();
if cx.is_done() {
return Ok(());
}
let caps = TerminalCapabilities::with_overrides();
let mouse_supported = caps.mouse_sgr;
let enabled = enabled && mouse_supported;
if enabled == self.mouse_enabled {
self.options.mouse_capture = enabled;
return Ok(());
}
let start = web_time::Instant::now();
let mut stdout = io::stdout();
let result = if enabled {
let enable_seq = Self::mouse_enable_sequence_for_caps(&caps);
let r = stdout.write_all(enable_seq).and_then(|_| stdout.flush());
if r.is_ok() {
self.mouse_enabled = true;
self.options.mouse_capture = true;
} else {
self.mouse_enabled = self.mouse_enabled || enabled;
self.options.mouse_capture = self.mouse_enabled;
}
r
} else {
let disable_seq = Self::mouse_disable_sequence_for_caps(&caps);
let r = stdout.write_all(disable_seq).and_then(|_| stdout.flush());
if r.is_ok() {
self.mouse_enabled = false;
self.options.mouse_capture = false;
} else {
self.mouse_enabled = true;
self.options.mouse_capture = true;
}
r
};
let elapsed_us = start.elapsed().as_micros().min(u64::MAX as u128) as u64;
IO_WRITE_DURATION_SUM_US.fetch_add(elapsed_us, Ordering::Relaxed);
IO_WRITE_COUNT.fetch_add(1, Ordering::Relaxed);
if cx.is_done() {
warn!("terminal.io set_mouse_capture completed after Cx deadline/cancellation");
}
result
}
pub fn size_cx(&self, cx: &crate::cx::Cx) -> io::Result<(u16, u16)> {
let _span = info_span!(
"terminal.io",
op_type = "read",
cx_deadline_remaining_us = cx_deadline_remaining_us(cx),
cx_cancelled = cx.is_cancelled()
);
let _guard = _span.enter();
if cx.is_done() {
if let Some(env_size) = size_from_env() {
return Ok(env_size);
}
return Ok((2, 2));
}
let start = web_time::Instant::now();
let (w, h) = crossterm::terminal::size()?;
let elapsed_us = start.elapsed().as_micros().min(u64::MAX as u128) as u64;
IO_READ_DURATION_SUM_US.fetch_add(elapsed_us, Ordering::Relaxed);
IO_READ_COUNT.fetch_add(1, Ordering::Relaxed);
if w > 1 && h > 1 {
return Ok((w, h));
}
if let Some((env_w, env_h)) = size_from_env() {
return Ok((env_w, env_h));
}
let Some(retry_delay) = size_retry_delay(cx) else {
return Ok((w.max(2), h.max(2)));
};
std::thread::sleep(retry_delay);
let (w2, h2) = crossterm::terminal::size()?;
if w2 > 1 && h2 > 1 {
return Ok((w2, h2));
}
Ok((w2.max(2), h2.max(2)))
}
pub fn flush_cx(&self, cx: &crate::cx::Cx) -> io::Result<()> {
let _span = info_span!(
"terminal.io",
op_type = "flush",
cx_deadline_remaining_us = cx_deadline_remaining_us(cx),
cx_cancelled = cx.is_cancelled()
);
let _guard = _span.enter();
if cx.is_done() {
return Ok(());
}
let start = web_time::Instant::now();
let result = io::stdout().flush();
let elapsed_us = start.elapsed().as_micros().min(u64::MAX as u128) as u64;
IO_FLUSH_DURATION_SUM_US.fetch_add(elapsed_us, Ordering::Relaxed);
IO_FLUSH_COUNT.fetch_add(1, Ordering::Relaxed);
if cx.is_done() {
warn!("terminal.io flush completed after Cx deadline/cancellation");
}
result
}
pub fn options(&self) -> &SessionOptions {
&self.options
}
fn cleanup(&mut self) {
#[cfg(unix)]
let _ = self.signal_guard.take();
if self.headless {
self.alternate_screen_enabled = false;
self.mouse_enabled = false;
self.bracketed_paste_enabled = false;
self.focus_events_enabled = false;
self.kitty_keyboard_enabled = false;
let _ = self.session_lock.take();
return;
}
let mut stdout = io::stdout();
let caps = TerminalCapabilities::with_overrides();
let _ = stdout.write_all(RESET_SCROLL_REGION);
let _ = stdout.write_all(RESET_STYLE);
let _ = stdout.write_all(SYNC_END);
if self.kitty_keyboard_enabled {
let _ = Self::disable_kitty_keyboard(&mut stdout);
self.kitty_keyboard_enabled = false;
#[cfg(feature = "tracing")]
tracing::info!("kitty keyboard disabled");
}
if self.focus_events_enabled {
let _ = crossterm::execute!(stdout, crossterm::event::DisableFocusChange);
self.focus_events_enabled = false;
#[cfg(feature = "tracing")]
tracing::info!("focus events disabled");
}
if self.bracketed_paste_enabled {
let _ = crossterm::execute!(stdout, crossterm::event::DisableBracketedPaste);
self.bracketed_paste_enabled = false;
#[cfg(feature = "tracing")]
tracing::info!("bracketed paste disabled");
}
if self.mouse_enabled {
let _ = stdout.write_all(Self::mouse_disable_sequence_for_caps(&caps));
self.mouse_enabled = false;
#[cfg(feature = "tracing")]
tracing::info!("mouse capture disabled");
}
let _ = crossterm::execute!(stdout, crossterm::cursor::Show);
if self.alternate_screen_enabled {
let _ = crossterm::execute!(stdout, crossterm::terminal::LeaveAlternateScreen);
self.alternate_screen_enabled = false;
#[cfg(feature = "tracing")]
tracing::info!("alternate screen disabled");
}
let _ = crossterm::terminal::disable_raw_mode();
#[cfg(feature = "tracing")]
tracing::info!("terminal raw mode disabled");
let _ = stdout.flush();
let _ = self.session_lock.take();
}
fn enable_kitty_keyboard(writer: &mut impl Write) -> io::Result<()> {
writer.write_all(KITTY_KEYBOARD_ENABLE)?;
writer.flush()
}
fn disable_kitty_keyboard(writer: &mut impl Write) -> io::Result<()> {
writer.write_all(KITTY_KEYBOARD_DISABLE)?;
writer.flush()
}
}
impl Drop for TerminalSession {
fn drop(&mut self) {
self.cleanup();
}
}
fn size_from_env() -> Option<(u16, u16)> {
let cols = env::var("COLUMNS").ok()?.parse::<u16>().ok()?;
let rows = env::var("LINES").ok()?.parse::<u16>().ok()?;
if cols > 1 && rows > 1 {
Some((cols, rows))
} else {
None
}
}
fn install_panic_hook() {
static HOOK: OnceLock<()> = OnceLock::new();
HOOK.get_or_init(|| {
let previous = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
if !panic_cleanup_suppressed() {
best_effort_cleanup();
}
previous(info);
}));
});
}
pub fn best_effort_cleanup_for_exit() {
best_effort_cleanup();
}
fn best_effort_cleanup() {
let mut stdout = io::stdout();
let caps = TerminalCapabilities::with_overrides();
let _ = stdout.write_all(RESET_SCROLL_REGION);
let _ = stdout.write_all(RESET_STYLE);
let _ = stdout.write_all(SYNC_END);
if caps.kitty_keyboard && !caps.in_any_mux() {
let _ = TerminalSession::disable_kitty_keyboard(&mut stdout);
}
if caps.focus_events && !caps.in_any_mux() {
let _ = crossterm::execute!(stdout, crossterm::event::DisableFocusChange);
}
let _ = crossterm::execute!(stdout, crossterm::event::DisableBracketedPaste);
let _ = stdout.write_all(TerminalSession::mouse_disable_sequence_for_caps(&caps));
let _ = crossterm::execute!(stdout, crossterm::cursor::Show);
let _ = crossterm::execute!(stdout, crossterm::terminal::LeaveAlternateScreen);
let _ = crossterm::terminal::disable_raw_mode();
let _ = stdout.flush();
}
#[cfg(unix)]
#[derive(Debug)]
struct SignalGuard {
handle: signal_hook::iterator::Handle,
thread: Option<std::thread::JoinHandle<()>>,
}
#[cfg(unix)]
impl SignalGuard {
fn new() -> io::Result<Self> {
let mut signals =
Signals::new([SIGINT, SIGTERM, SIGHUP, SIGQUIT, SIGWINCH]).map_err(io::Error::other)?;
let handle = signals.handle();
let thread = std::thread::spawn(move || {
for signal in signals.forever() {
match signal {
SIGWINCH => {
#[cfg(feature = "tracing")]
tracing::debug!("SIGWINCH received");
}
SIGINT | SIGTERM | SIGHUP | SIGQUIT => {
#[cfg(feature = "tracing")]
tracing::warn!("termination signal received, cleaning up");
crate::shutdown_signal::record_pending_termination_signal(signal);
best_effort_cleanup();
if !wait_for_shutdown_ack() {
std::process::exit(128 + signal);
}
}
_ => {}
}
}
});
Ok(Self {
handle,
thread: Some(thread),
})
}
}
#[cfg(unix)]
impl Drop for SignalGuard {
fn drop(&mut self) {
self.handle.close();
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
}
}
#[doc(hidden)]
pub const _SPIKE_NOTES: () = ();
#[cfg(test)]
mod tests {
use super::*;
#[cfg(unix)]
use portable_pty::{CommandBuilder, PtySize};
#[cfg(unix)]
use std::io::{self, Read, Write};
#[cfg(unix)]
use std::sync::mpsc;
#[cfg(unix)]
use std::thread;
#[cfg(unix)]
use std::time::{Duration, Instant};
#[test]
fn session_options_default_is_minimal() {
let opts = SessionOptions::default();
assert!(!opts.alternate_screen);
assert!(!opts.mouse_capture);
assert!(!opts.bracketed_paste);
assert!(!opts.focus_events);
assert!(!opts.kitty_keyboard);
}
#[test]
fn session_options_clone() {
let opts = SessionOptions {
alternate_screen: true,
mouse_capture: true,
bracketed_paste: false,
focus_events: true,
kitty_keyboard: false,
intercept_signals: true,
};
let cloned = opts.clone();
assert_eq!(cloned.alternate_screen, opts.alternate_screen);
assert_eq!(cloned.mouse_capture, opts.mouse_capture);
assert_eq!(cloned.bracketed_paste, opts.bracketed_paste);
assert_eq!(cloned.focus_events, opts.focus_events);
assert_eq!(cloned.kitty_keyboard, opts.kitty_keyboard);
}
#[test]
fn session_options_debug() {
let opts = SessionOptions::default();
let debug = format!("{:?}", opts);
assert!(debug.contains("SessionOptions"));
assert!(debug.contains("alternate_screen"));
}
#[test]
fn kitty_keyboard_escape_sequences() {
assert_eq!(KITTY_KEYBOARD_ENABLE, b"\x1b[>15u");
assert_eq!(KITTY_KEYBOARD_DISABLE, b"\x1b[<u");
}
#[test]
fn mouse_enable_excludes_any_event_mode() {
assert!(
!MOUSE_ENABLE_SEQ
.windows(b"\x1b[?1003h".len())
.any(|w| w == b"\x1b[?1003h"),
"mouse enable must not include 1003 any-event mode"
);
assert!(
MOUSE_ENABLE_SEQ
.windows(b"\x1b[?1000h".len())
.any(|w| w == b"\x1b[?1000h"),
"mouse enable should include 1000 normal tracking"
);
assert!(
MOUSE_ENABLE_SEQ
.windows(b"\x1b[?1002h".len())
.any(|w| w == b"\x1b[?1002h"),
"mouse enable should include 1002 button-event tracking"
);
assert!(
MOUSE_ENABLE_SEQ
.windows(b"\x1b[?1006h".len())
.any(|w| w == b"\x1b[?1006h"),
"mouse enable should include 1006 SGR tracking"
);
let pos_1016l = MOUSE_ENABLE_SEQ
.windows(b"\x1b[?1016l".len())
.position(|w| w == b"\x1b[?1016l")
.expect("mouse enable should clear 1016 before enabling SGR");
let pos_1006h = MOUSE_ENABLE_SEQ
.windows(b"\x1b[?1006h".len())
.position(|w| w == b"\x1b[?1006h")
.expect("mouse enable should include 1006 SGR mode");
assert!(
pos_1016l < pos_1006h,
"1016l must be emitted before 1006h to preserve SGR mode on Ghostty-like terminals"
);
}
#[test]
fn mouse_enable_mux_safe_sequence_is_minimal() {
let mux_caps = TerminalCapabilities::builder()
.mouse_sgr(true)
.in_tmux(true)
.build();
assert_eq!(
TerminalSession::mouse_enable_sequence_for_caps(&mux_caps),
MOUSE_ENABLE_MUX_SAFE_SEQ
);
assert!(
MOUSE_ENABLE_MUX_SAFE_SEQ
.windows(b"\x1b[?1005l".len())
.any(|w| w == b"\x1b[?1005l"),
"mux-safe mouse enable must clear UTF-8 mouse encoding (1005)"
);
assert!(
MOUSE_ENABLE_MUX_SAFE_SEQ
.windows(b"\x1b[?1015l".len())
.any(|w| w == b"\x1b[?1015l"),
"mux-safe mouse enable must clear urxvt mouse encoding (1015)"
);
assert!(
MOUSE_ENABLE_MUX_SAFE_SEQ
.windows(b"\x1b[?1006h".len())
.any(|w| w == b"\x1b[?1006h"),
"mux-safe mouse enable must keep SGR mode"
);
assert!(
!MOUSE_ENABLE_MUX_SAFE_SEQ
.windows(b"\x1b[?1003h".len())
.any(|w| w == b"\x1b[?1003h"),
"mux-safe mouse enable must not include 1003 any-event mode"
);
let pos_1016l = MOUSE_ENABLE_MUX_SAFE_SEQ
.windows(b"\x1b[?1016l".len())
.position(|w| w == b"\x1b[?1016l")
.expect("mux-safe mouse enable should clear 1016 before enabling SGR");
let pos_1006h = MOUSE_ENABLE_MUX_SAFE_SEQ
.windows(b"\x1b[?1006h".len())
.position(|w| w == b"\x1b[?1006h")
.expect("mux-safe mouse enable should include 1006 SGR mode");
assert!(
pos_1016l < pos_1006h,
"mux-safe mouse enable must emit 1016l before 1006h to preserve SGR mode"
);
}
#[test]
fn mouse_disable_mux_safe_sequence_clears_1016() {
let mux_caps = TerminalCapabilities::builder()
.mouse_sgr(true)
.in_tmux(true)
.build();
assert_eq!(
TerminalSession::mouse_disable_sequence_for_caps(&mux_caps),
MOUSE_DISABLE_MUX_SAFE_SEQ
);
let pos_1016l = MOUSE_DISABLE_MUX_SAFE_SEQ
.windows(b"\x1b[?1016l".len())
.position(|w| w == b"\x1b[?1016l")
.expect("mux-safe mouse disable should clear 1016");
let pos_1006l = MOUSE_DISABLE_MUX_SAFE_SEQ
.windows(b"\x1b[?1006l".len())
.position(|w| w == b"\x1b[?1006l")
.expect("mux-safe mouse disable should disable 1006");
assert!(
pos_1016l < pos_1006l,
"mux-safe mouse disable should clear 1016 before dropping SGR mode"
);
}
#[test]
fn session_options_partial_config() {
let opts = SessionOptions {
alternate_screen: true,
mouse_capture: false,
bracketed_paste: true,
..Default::default()
};
assert!(opts.alternate_screen);
assert!(!opts.mouse_capture);
assert!(opts.bracketed_paste);
assert!(!opts.focus_events);
assert!(!opts.kitty_keyboard);
}
#[test]
fn sanitize_session_options_disables_unsupported_capabilities() {
let requested = SessionOptions {
alternate_screen: true,
mouse_capture: true,
bracketed_paste: true,
focus_events: true,
kitty_keyboard: true,
intercept_signals: true,
};
let caps = TerminalCapabilities::basic();
let sanitized = sanitize_session_options(requested, &caps);
assert!(sanitized.alternate_screen);
assert!(!sanitized.mouse_capture);
assert!(!sanitized.bracketed_paste);
assert!(!sanitized.focus_events);
assert!(!sanitized.kitty_keyboard);
}
#[test]
fn sanitize_session_options_is_conservative_in_wezterm_mux() {
let requested = SessionOptions {
alternate_screen: true,
mouse_capture: true,
bracketed_paste: true,
focus_events: true,
kitty_keyboard: true,
intercept_signals: true,
};
let caps = TerminalCapabilities::builder()
.mouse_sgr(true)
.bracketed_paste(true)
.focus_events(true)
.kitty_keyboard(true)
.in_wezterm_mux(true)
.build();
let sanitized = sanitize_session_options(requested, &caps);
assert!(sanitized.alternate_screen);
assert!(sanitized.mouse_capture);
assert!(sanitized.bracketed_paste);
assert!(!sanitized.focus_events);
assert!(!sanitized.kitty_keyboard);
}
#[test]
fn sanitize_session_options_is_conservative_in_tmux() {
let requested = SessionOptions {
alternate_screen: true,
mouse_capture: true,
bracketed_paste: true,
focus_events: true,
kitty_keyboard: true,
intercept_signals: true,
};
let caps = TerminalCapabilities::builder()
.mouse_sgr(true)
.bracketed_paste(true)
.focus_events(true)
.kitty_keyboard(true)
.in_tmux(true)
.build();
let sanitized = sanitize_session_options(requested, &caps);
assert!(sanitized.alternate_screen);
assert!(sanitized.mouse_capture);
assert!(sanitized.bracketed_paste);
assert!(!sanitized.focus_events);
assert!(!sanitized.kitty_keyboard);
}
#[cfg(unix)]
enum ReaderMsg {
Data(Vec<u8>),
Eof,
Err(std::io::Error),
}
#[cfg(unix)]
fn read_until_pattern(
rx: &mpsc::Receiver<ReaderMsg>,
captured: &mut Vec<u8>,
pattern: &[u8],
timeout: Duration,
) -> std::io::Result<()> {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(Instant::now());
let wait = remaining.min(Duration::from_millis(50));
match rx.recv_timeout(wait) {
Ok(ReaderMsg::Data(chunk)) => {
captured.extend_from_slice(&chunk);
if captured.windows(pattern.len()).any(|w| w == pattern) {
return Ok(());
}
}
Ok(ReaderMsg::Eof) => break,
Ok(ReaderMsg::Err(err)) => return Err(err),
Err(mpsc::RecvTimeoutError::Timeout) => continue,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
Err(std::io::Error::other(
"timeout waiting for PTY output marker",
))
}
#[cfg(unix)]
fn assert_contains_any(output: &[u8], options: &[&[u8]], label: &str) {
let found = options
.iter()
.any(|needle| output.windows(needle.len()).any(|w| w == *needle));
assert!(found, "expected cleanup sequence for {label}");
}
#[test]
fn kitty_keyboard_enable_writes_correct_sequence() {
let mut buf = Vec::new();
TerminalSession::enable_kitty_keyboard(&mut buf).unwrap();
assert_eq!(buf, b"\x1b[>15u");
}
#[test]
fn kitty_keyboard_disable_writes_correct_sequence() {
let mut buf = Vec::new();
TerminalSession::disable_kitty_keyboard(&mut buf).unwrap();
assert_eq!(buf, b"\x1b[<u");
}
#[test]
fn kitty_keyboard_roundtrip_writes_both_sequences() {
let mut buf = Vec::new();
TerminalSession::enable_kitty_keyboard(&mut buf).unwrap();
TerminalSession::disable_kitty_keyboard(&mut buf).unwrap();
assert_eq!(buf, b"\x1b[>15u\x1b[<u");
}
#[test]
fn session_options_all_enabled() {
let opts = SessionOptions {
alternate_screen: true,
mouse_capture: true,
bracketed_paste: true,
focus_events: true,
kitty_keyboard: true,
intercept_signals: true,
};
assert!(opts.alternate_screen);
assert!(opts.mouse_capture);
assert!(opts.bracketed_paste);
assert!(opts.focus_events);
assert!(opts.kitty_keyboard);
}
#[test]
fn session_options_debug_contains_all_fields() {
let opts = SessionOptions {
alternate_screen: true,
mouse_capture: false,
bracketed_paste: true,
focus_events: false,
kitty_keyboard: true,
intercept_signals: true,
};
let debug = format!("{opts:?}");
assert!(debug.contains("alternate_screen: true"), "{debug}");
assert!(debug.contains("mouse_capture: false"), "{debug}");
assert!(debug.contains("bracketed_paste: true"), "{debug}");
assert!(debug.contains("focus_events: false"), "{debug}");
assert!(debug.contains("kitty_keyboard: true"), "{debug}");
assert!(debug.contains("intercept_signals: true"), "{debug}");
}
#[test]
fn session_options_clone_independence() {
let opts = SessionOptions {
alternate_screen: true,
..Default::default()
};
let mut cloned = opts.clone();
cloned.alternate_screen = false;
assert!(opts.alternate_screen);
assert!(!cloned.alternate_screen);
}
#[cfg(feature = "test-helpers")]
#[test]
fn new_for_tests_default_options() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
assert!(!session.mouse_capture_enabled());
assert!(!session.alternate_screen_enabled);
assert!(!session.mouse_enabled);
assert!(!session.bracketed_paste_enabled);
assert!(!session.focus_events_enabled);
assert!(!session.kitty_keyboard_enabled);
}
#[cfg(feature = "test-helpers")]
#[test]
fn new_for_tests_preserves_options() {
let opts = SessionOptions {
alternate_screen: true,
mouse_capture: true,
bracketed_paste: true,
focus_events: true,
kitty_keyboard: true,
intercept_signals: true,
};
let session = TerminalSession::new_for_tests(opts).unwrap();
let stored = session.options();
assert!(stored.alternate_screen);
assert!(stored.mouse_capture);
assert!(stored.bracketed_paste);
assert!(stored.focus_events);
assert!(stored.kitty_keyboard);
}
#[cfg(feature = "test-helpers")]
#[test]
fn new_for_tests_flags_all_false_regardless_of_options() {
let opts = SessionOptions {
alternate_screen: true,
mouse_capture: true,
bracketed_paste: true,
focus_events: true,
kitty_keyboard: true,
intercept_signals: true,
};
let session = TerminalSession::new_for_tests(opts).unwrap();
assert!(!session.alternate_screen_enabled);
assert!(!session.mouse_enabled);
assert!(!session.bracketed_paste_enabled);
assert!(!session.focus_events_enabled);
assert!(!session.kitty_keyboard_enabled);
}
#[cfg(feature = "test-helpers")]
#[test]
fn new_for_tests_allows_multiple_sessions() {
let _a = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
let _b = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
}
#[cfg(feature = "test-helpers")]
#[test]
fn new_for_tests_marks_session_headless() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
assert!(session.headless);
}
#[cfg(feature = "test-helpers")]
#[test]
fn mouse_capture_enabled_getter() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
assert!(!session.mouse_capture_enabled());
}
#[cfg(feature = "test-helpers")]
#[test]
fn options_getter_returns_session_options() {
let opts = SessionOptions {
mouse_capture: true,
focus_events: true,
..Default::default()
};
let session = TerminalSession::new_for_tests(opts).unwrap();
assert!(session.options().mouse_capture);
assert!(session.options().focus_events);
assert!(!session.options().alternate_screen);
}
#[cfg(feature = "test-helpers")]
#[test]
fn set_mouse_capture_idempotent_disable() {
let mut session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
assert!(!session.mouse_capture_enabled());
session.set_mouse_capture(false).unwrap();
assert!(!session.mouse_capture_enabled());
}
#[cfg(feature = "test-helpers")]
#[test]
fn set_mouse_capture_enable_then_idempotent_enable() {
let _g = crate::capability_override::push_override(
crate::capability_override::CapabilityOverride::modern(),
);
let mut session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
session.set_mouse_capture(true).unwrap();
assert!(session.mouse_capture_enabled());
assert!(session.options().mouse_capture);
session.set_mouse_capture(true).unwrap();
assert!(session.mouse_capture_enabled());
}
#[cfg(feature = "test-helpers")]
#[test]
fn set_mouse_capture_toggle_roundtrip() {
let _g = crate::capability_override::push_override(
crate::capability_override::CapabilityOverride::modern(),
);
let mut session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
assert!(!session.mouse_capture_enabled());
session.set_mouse_capture(true).unwrap();
assert!(session.mouse_capture_enabled());
assert!(session.options().mouse_capture);
session.set_mouse_capture(false).unwrap();
assert!(!session.mouse_capture_enabled());
assert!(!session.options().mouse_capture);
}
#[cfg(feature = "test-helpers")]
#[test]
fn set_mouse_capture_multiple_toggles() {
let _g = crate::capability_override::push_override(
crate::capability_override::CapabilityOverride::modern(),
);
let mut session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
for _ in 0..5 {
session.set_mouse_capture(true).unwrap();
assert!(session.mouse_capture_enabled());
session.set_mouse_capture(false).unwrap();
assert!(!session.mouse_capture_enabled());
}
}
#[cfg(feature = "test-helpers")]
#[test]
fn cleanup_clears_all_flags() {
let mut session = TerminalSession::new_for_tests(SessionOptions {
alternate_screen: true,
mouse_capture: true,
bracketed_paste: true,
focus_events: true,
kitty_keyboard: true,
intercept_signals: true,
})
.unwrap();
session.alternate_screen_enabled = true;
session.mouse_enabled = true;
session.bracketed_paste_enabled = true;
session.focus_events_enabled = true;
session.kitty_keyboard_enabled = true;
session.cleanup();
assert!(!session.alternate_screen_enabled);
assert!(!session.mouse_enabled);
assert!(!session.bracketed_paste_enabled);
assert!(!session.focus_events_enabled);
assert!(!session.kitty_keyboard_enabled);
}
#[cfg(feature = "test-helpers")]
#[test]
fn cleanup_is_idempotent() {
let mut session = TerminalSession::new_for_tests(SessionOptions {
mouse_capture: true,
..Default::default()
})
.unwrap();
session.mouse_enabled = true;
session.cleanup();
assert!(!session.mouse_enabled);
session.cleanup();
assert!(!session.mouse_enabled);
}
#[cfg(feature = "test-helpers")]
#[test]
fn cleanup_only_disables_enabled_features() {
let mut session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
session.mouse_enabled = true;
session.cleanup();
assert!(!session.mouse_enabled);
assert!(!session.alternate_screen_enabled);
}
#[cfg(feature = "test-helpers")]
#[test]
fn cleanup_headless_session_clears_feature_flags() {
let mut session = TerminalSession::new_for_tests(SessionOptions {
alternate_screen: true,
mouse_capture: true,
bracketed_paste: true,
focus_events: true,
kitty_keyboard: true,
intercept_signals: true,
})
.unwrap();
session.alternate_screen_enabled = true;
session.mouse_enabled = true;
session.bracketed_paste_enabled = true;
session.focus_events_enabled = true;
session.kitty_keyboard_enabled = true;
session.cleanup();
assert!(!session.alternate_screen_enabled);
assert!(!session.mouse_enabled);
assert!(!session.bracketed_paste_enabled);
assert!(!session.focus_events_enabled);
assert!(!session.kitty_keyboard_enabled);
}
#[cfg(feature = "test-helpers")]
#[test]
fn session_debug_format() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
let debug = format!("{session:?}");
assert!(debug.contains("TerminalSession"), "{debug}");
assert!(debug.contains("mouse_enabled"), "{debug}");
assert!(debug.contains("alternate_screen_enabled"), "{debug}");
}
#[test]
fn panic_cleanup_suppression_scope_restores_state() {
assert!(
!panic_cleanup_suppressed(),
"suppression should start disabled"
);
with_panic_cleanup_suppressed(|| {
if cfg!(panic = "abort") {
assert!(
!panic_cleanup_suppressed(),
"abort profile must not suppress panic cleanup"
);
return;
}
assert!(panic_cleanup_suppressed(), "suppression should be enabled");
with_panic_cleanup_suppressed(|| {
assert!(
panic_cleanup_suppressed(),
"nested suppression should remain enabled"
);
});
assert!(
panic_cleanup_suppressed(),
"outer suppression should still be enabled after nested scope"
);
});
assert!(
!panic_cleanup_suppressed(),
"suppression should be disabled after scope exits"
);
}
#[cfg(unix)]
#[test]
fn terminal_session_panic_cleanup_idempotent() {
const MARKER: &[u8] = b"PANIC_CAUGHT";
const TEST_NAME: &str =
"terminal_session::tests::terminal_session_panic_cleanup_idempotent";
const ALT_SCREEN_EXIT_SEQS: &[&[u8]] = &[b"\x1b[?1049l", b"\x1b[?1047l"];
const MOUSE_DISABLE_SEQS: &[&[u8]] = &[
b"\x1b[?1000l\x1b[?1002l\x1b[?1006l",
b"\x1b[?1000;1002;1006l",
b"\x1b[?1000;1002l",
b"\x1b[?1000l",
];
const BRACKETED_PASTE_DISABLE_SEQS: &[&[u8]] = &[b"\x1b[?2004l"];
const FOCUS_DISABLE_SEQS: &[&[u8]] = &[b"\x1b[?1004l"];
const KITTY_DISABLE_SEQS: &[&[u8]] = &[b"\x1b[<u"];
const CURSOR_SHOW_SEQS: &[&[u8]] = &[b"\x1b[?25h"];
const SCROLL_REGION_RESET_SEQS: &[&[u8]] = &[b"\x1b[r"];
const STYLE_RESET_SEQS: &[&[u8]] = &[b"\x1b[0m"];
if std::env::var("FTUI_CORE_PANIC_CHILD").is_ok() {
let _ = std::panic::catch_unwind(|| {
let _session = TerminalSession::new(SessionOptions {
alternate_screen: true,
mouse_capture: true,
bracketed_paste: true,
focus_events: true,
kitty_keyboard: true,
intercept_signals: true,
})
.expect("TerminalSession::new should succeed in PTY");
panic!("intentional panic to exercise cleanup");
});
best_effort_cleanup_for_exit();
let _ = io::stdout().write_all(MARKER);
let _ = io::stdout().flush();
return;
}
let exe = std::env::current_exe().expect("current_exe");
let mut cmd = CommandBuilder::new(exe);
cmd.args(["--exact", TEST_NAME, "--nocapture"]);
cmd.env("FTUI_CORE_PANIC_CHILD", "1");
cmd.env("RUST_BACKTRACE", "0");
cmd.env("TERM", "xterm-256color");
cmd.env("FTUI_TEST_PROFILE", "modern");
cmd.env("TERM_PROGRAM", "WezTerm");
cmd.env_remove("TMUX");
cmd.env_remove("STY");
cmd.env_remove("ZELLIJ");
cmd.env_remove("WEZTERM_PANE");
let pty_system = portable_pty::native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})
.expect("openpty");
let mut child = pair.slave.spawn_command(cmd).expect("spawn PTY child");
drop(pair.slave);
let mut reader = pair.master.try_clone_reader().expect("clone PTY reader");
let _writer = pair.master.take_writer().expect("take PTY writer");
let (tx, rx) = mpsc::channel::<ReaderMsg>();
let reader_thread = thread::spawn(move || {
let mut buf = [0u8; 4096];
loop {
match reader.read(&mut buf) {
Ok(0) => {
let _ = tx.send(ReaderMsg::Eof);
break;
}
Ok(n) => {
let _ = tx.send(ReaderMsg::Data(buf[..n].to_vec()));
}
Err(err) => {
let _ = tx.send(ReaderMsg::Err(err));
break;
}
}
}
});
let mut captured = Vec::new();
read_until_pattern(&rx, &mut captured, MARKER, Duration::from_secs(5))
.expect("expected marker from child");
let status = child.wait().expect("child wait");
let _ = reader_thread.join();
assert!(status.success(), "child should exit successfully");
assert!(
captured.windows(MARKER.len()).any(|w| w == MARKER),
"expected panic marker in PTY output"
);
assert_contains_any(&captured, ALT_SCREEN_EXIT_SEQS, "alt-screen exit");
assert_contains_any(&captured, MOUSE_DISABLE_SEQS, "mouse disable");
assert_contains_any(
&captured,
BRACKETED_PASTE_DISABLE_SEQS,
"bracketed paste disable",
);
assert_contains_any(&captured, FOCUS_DISABLE_SEQS, "focus disable");
assert_contains_any(&captured, KITTY_DISABLE_SEQS, "kitty disable");
assert_contains_any(&captured, CURSOR_SHOW_SEQS, "cursor show");
assert_contains_any(&captured, SCROLL_REGION_RESET_SEQS, "scroll-region reset");
assert_contains_any(&captured, STYLE_RESET_SEQS, "style reset");
}
#[cfg(unix)]
#[test]
fn terminal_session_enforces_single_active_session() {
const MARKER: &[u8] = b"EXCLUSIVITY_OK";
const TEST_NAME: &str =
"terminal_session::tests::terminal_session_enforces_single_active_session";
if std::env::var("FTUI_CORE_EXCLUSIVITY_CHILD").is_ok() {
let session = TerminalSession::new(SessionOptions::default())
.expect("TerminalSession::new should succeed in PTY");
let err = TerminalSession::new(SessionOptions::default())
.expect_err("second TerminalSession::new should be rejected");
let msg = err.to_string();
assert!(
msg.contains("already active"),
"unexpected error message: {msg}"
);
drop(session);
let _session2 = TerminalSession::new(SessionOptions::default())
.expect("TerminalSession::new should succeed after previous session dropped");
let _ = io::stdout().write_all(MARKER);
let _ = io::stdout().flush();
return;
}
let exe = std::env::current_exe().expect("current_exe");
let mut cmd = CommandBuilder::new(exe);
cmd.args(["--exact", TEST_NAME, "--nocapture"]);
cmd.env("FTUI_CORE_EXCLUSIVITY_CHILD", "1");
cmd.env("RUST_BACKTRACE", "0");
cmd.env("TERM", "xterm-256color");
cmd.env("TERM_PROGRAM", "WezTerm");
cmd.env_remove("TMUX");
cmd.env_remove("STY");
cmd.env_remove("ZELLIJ");
cmd.env_remove("WEZTERM_PANE");
let pty_system = portable_pty::native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})
.expect("openpty");
let mut child = pair.slave.spawn_command(cmd).expect("spawn PTY child");
drop(pair.slave);
let mut reader = pair.master.try_clone_reader().expect("clone PTY reader");
let _writer = pair.master.take_writer().expect("take PTY writer");
let (tx, rx) = mpsc::channel::<ReaderMsg>();
let reader_thread = thread::spawn(move || {
let mut buf = [0u8; 4096];
loop {
match reader.read(&mut buf) {
Ok(0) => {
let _ = tx.send(ReaderMsg::Eof);
break;
}
Ok(n) => {
let _ = tx.send(ReaderMsg::Data(buf[..n].to_vec()));
}
Err(err) => {
let _ = tx.send(ReaderMsg::Err(err));
break;
}
}
}
});
let mut captured = Vec::new();
read_until_pattern(&rx, &mut captured, MARKER, Duration::from_secs(5))
.expect("expected marker from child");
let status = child.wait().expect("child wait");
let _ = reader_thread.join();
assert!(status.success(), "child should exit successfully");
assert!(
captured.windows(MARKER.len()).any(|w| w == MARKER),
"expected marker in PTY output"
);
}
#[test]
fn to_std_duration_converts_correctly() {
let d = web_time::Duration::from_millis(1234);
let std_d = super::to_std_duration(d);
assert_eq!(std_d, Duration::from_millis(1234));
}
#[test]
fn to_std_duration_zero() {
let d = web_time::Duration::from_secs(0);
let std_d = super::to_std_duration(d);
assert_eq!(std_d, Duration::ZERO);
}
#[test]
fn to_std_duration_large_value() {
let d = web_time::Duration::from_secs(86400);
let std_d = super::to_std_duration(d);
assert_eq!(std_d, Duration::from_secs(86400));
}
#[test]
fn cx_deadline_remaining_us_no_deadline() {
let (cx, _ctrl) = crate::cx::Cx::background();
let remaining = super::cx_deadline_remaining_us(&cx);
assert_eq!(remaining, u64::MAX);
}
#[test]
fn cx_deadline_remaining_us_with_deadline() {
let (cx, _ctrl) = crate::cx::Cx::with_deadline(web_time::Duration::from_millis(500));
let remaining = super::cx_deadline_remaining_us(&cx);
assert!(remaining <= 500_000, "remaining={remaining}");
assert!(remaining > 400_000, "remaining={remaining}");
}
#[test]
fn cx_deadline_remaining_us_cancelled() {
let (cx, ctrl) = crate::cx::Cx::background();
ctrl.cancel();
assert_eq!(super::cx_deadline_remaining_us(&cx), u64::MAX);
}
#[test]
fn cx_deadline_remaining_us_expired() {
let (cx, _ctrl) = crate::cx::Cx::with_deadline(web_time::Duration::from_nanos(1));
std::thread::sleep(Duration::from_millis(2));
let remaining = super::cx_deadline_remaining_us(&cx);
assert_eq!(remaining, 0);
}
#[test]
fn terminal_io_stats_functions_return_tuples() {
let (_sum, _count) = terminal_io_read_stats();
let (_sum_w, _count_w) = terminal_io_write_stats();
let (_sum_f, _count_f) = terminal_io_flush_stats();
}
#[test]
fn terminal_io_metrics_counters_are_monotonic() {
let (_, count_before) = terminal_io_read_stats();
let (_, count_after) = terminal_io_read_stats();
assert!(count_after >= count_before);
}
#[cfg(feature = "test-helpers")]
#[test]
fn poll_event_cx_returns_false_when_cancelled() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
let (cx, ctrl) = crate::cx::Cx::background();
ctrl.cancel();
let result = session.poll_event_cx(Duration::from_secs(10), &cx);
assert!(result.is_ok());
assert!(
!result.unwrap(),
"cancelled cx should return false immediately"
);
}
#[cfg(feature = "test-helpers")]
#[test]
fn poll_event_cx_returns_false_when_expired() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
let (cx, _ctrl) = crate::cx::Cx::with_deadline(web_time::Duration::from_nanos(1));
std::thread::sleep(Duration::from_millis(2));
let result = session.poll_event_cx(Duration::from_secs(10), &cx);
assert!(result.is_ok());
assert!(
!result.unwrap(),
"expired cx should return false immediately"
);
}
#[cfg(feature = "test-helpers")]
#[test]
fn read_event_cx_returns_none_when_cancelled() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
let (cx, ctrl) = crate::cx::Cx::background();
ctrl.cancel();
let result = session.read_event_cx(&cx);
assert!(result.is_ok());
assert!(result.unwrap().is_none(), "cancelled cx should return None");
}
#[cfg(feature = "test-helpers")]
#[test]
fn read_event_cx_returns_none_when_expired() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
let (cx, _ctrl) = crate::cx::Cx::with_deadline(web_time::Duration::from_nanos(1));
std::thread::sleep(Duration::from_millis(2));
let result = session.read_event_cx(&cx);
assert!(result.is_ok());
assert!(result.unwrap().is_none(), "expired cx should return None");
}
#[cfg(feature = "test-helpers")]
#[test]
fn show_cursor_cx_noop_when_cancelled() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
let (cx, ctrl) = crate::cx::Cx::background();
ctrl.cancel();
assert!(session.show_cursor_cx(&cx).is_ok());
}
#[cfg(feature = "test-helpers")]
#[test]
fn hide_cursor_cx_noop_when_cancelled() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
let (cx, ctrl) = crate::cx::Cx::background();
ctrl.cancel();
assert!(session.hide_cursor_cx(&cx).is_ok());
}
#[cfg(feature = "test-helpers")]
#[test]
fn flush_cx_noop_when_cancelled() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
let (cx, ctrl) = crate::cx::Cx::background();
ctrl.cancel();
assert!(session.flush_cx(&cx).is_ok());
}
#[cfg(feature = "test-helpers")]
#[test]
fn size_cx_returns_minimum_when_cancelled() {
let session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
let (cx, ctrl) = crate::cx::Cx::background();
ctrl.cancel();
let (w, h) = session.size_cx(&cx).unwrap();
assert!(w >= 2, "width={w}");
assert!(h >= 2, "height={h}");
}
#[cfg(feature = "test-helpers")]
#[test]
fn set_mouse_capture_cx_noop_when_cancelled() {
let mut session = TerminalSession::new_for_tests(SessionOptions::default()).unwrap();
let (cx, ctrl) = crate::cx::Cx::background();
ctrl.cancel();
assert!(session.set_mouse_capture_cx(true, &cx).is_ok());
assert!(!session.mouse_capture_enabled());
}
#[test]
fn cx_lab_deadline_remaining_us_deterministic() {
let clock = crate::cx::LabClock::new();
let (cx, _ctrl) =
crate::cx::Cx::lab_with_deadline(&clock, web_time::Duration::from_millis(100));
let remaining = super::cx_deadline_remaining_us(&cx);
assert!(remaining <= 100_000, "remaining={remaining}");
assert!(remaining > 90_000, "remaining={remaining}");
clock.advance(web_time::Duration::from_millis(50));
let remaining = super::cx_deadline_remaining_us(&cx);
assert!(remaining <= 50_000, "remaining={remaining}");
assert!(remaining > 40_000, "remaining={remaining}");
}
#[test]
fn size_retry_delay_uses_default_window_without_deadline() {
let (cx, _ctrl) = crate::cx::Cx::background();
assert_eq!(super::size_retry_delay(&cx), Some(SIZE_RETRY_DELAY));
}
#[test]
fn size_retry_delay_skips_when_cancelled() {
let (cx, ctrl) = crate::cx::Cx::background();
ctrl.cancel();
assert_eq!(super::size_retry_delay(&cx), None);
}
#[test]
fn size_retry_delay_skips_when_deadline_cannot_cover_retry() {
let clock = crate::cx::LabClock::new();
let (cx, _ctrl) =
crate::cx::Cx::lab_with_deadline(&clock, web_time::Duration::from_millis(10));
assert_eq!(super::size_retry_delay(&cx), None);
}
#[test]
fn size_retry_delay_allows_retry_when_budget_exceeds_delay() {
let clock = crate::cx::LabClock::new();
let (cx, _ctrl) =
crate::cx::Cx::lab_with_deadline(&clock, web_time::Duration::from_millis(25));
assert_eq!(super::size_retry_delay(&cx), Some(SIZE_RETRY_DELAY));
}
#[test]
fn pending_termination_signal_round_trip() {
crate::shutdown_signal::with_test_signal_serialization(|| {
crate::shutdown_signal::clear_pending_termination_signal();
assert_eq!(crate::shutdown_signal::pending_termination_signal(), None);
crate::shutdown_signal::record_pending_termination_signal(2);
assert_eq!(
crate::shutdown_signal::pending_termination_signal(),
Some(2)
);
crate::shutdown_signal::record_pending_termination_signal(15);
assert_eq!(
crate::shutdown_signal::pending_termination_signal(),
Some(2)
);
crate::shutdown_signal::clear_pending_termination_signal();
assert_eq!(crate::shutdown_signal::pending_termination_signal(), None);
});
}
}