use snafu::ResultExt as _;
use tracing::Instrument as _;
#[derive(Debug)]
struct WeztermConfig {
scrollback: usize,
}
impl wezterm_term::TerminalConfiguration for WeztermConfig {
fn scrollback_size(&self) -> usize {
self.scrollback
}
fn color_palette(&self) -> wezterm_term::color::ColorPalette {
wezterm_term::color::ColorPalette::default()
}
}
#[expect(
clippy::exhaustive_structs,
reason = "
I just really like the ability to specify config in a struct. As if it were JSON.
I know that means projects depending on this struct run the risk of unexpected
breakage when I add a new field. But maybe we can manage those expectations by
making sure that all example code is based off `ShadowTerminalConfig::default()`?
"
)]
pub struct Config {
pub width: u16,
pub height: u16,
pub command: Vec<std::ffi::OsString>,
pub scrollback_size: usize,
pub scrollback_step: usize,
}
impl Default for Config {
#[inline]
fn default() -> Self {
Self {
width: 100,
height: 30,
command: vec!["bash".into()],
scrollback_size: 1000,
scrollback_step: 5,
}
}
}
#[non_exhaustive]
pub struct Channels {
pub control_tx: tokio::sync::broadcast::Sender<crate::Protocol>,
pub output_tx: tokio::sync::mpsc::Sender<crate::pty::BytesFromPTY>,
pub output_rx: tokio::sync::mpsc::Receiver<crate::pty::BytesFromPTY>,
pub internal_input_tx: Option<tokio::sync::mpsc::Sender<crate::pty::BytesFromSTDIN>>,
shadow_output: tokio::sync::mpsc::Sender<crate::output::Output>,
}
#[non_exhaustive]
pub struct LastSent {
pub pty_sequence: usize,
pub pty_size: (usize, usize),
}
const CURSOR_POSITION_REQUEST: &str = "\x1b[6n";
const APPLICATION_MODE_START: &str = "\x1b[?1h";
const APPLICATION_MODE_END: &str = "\x1b[?1l";
const TIME_TO_WAIT_FOR_MORE_PTY_OUTPUT: u64 = 1000;
#[non_exhaustive]
pub struct ShadowTerminal {
pub terminal: wezterm_term::Terminal,
pub config: Config,
pub channels: Channels,
pub accumulated_pty_output: Vec<u8>,
pub wait_for_output_until: Option<tokio::time::Instant>,
pub scroll_position: usize,
pub last_sent: LastSent,
}
impl ShadowTerminal {
#[inline]
pub fn new(
config: Config,
shadow_output: tokio::sync::mpsc::Sender<crate::output::Output>,
) -> Self {
let (control_tx, _) = tokio::sync::broadcast::channel(64);
let (output_tx, output_rx) = tokio::sync::mpsc::channel(1);
tracing::debug!("Creating the in-memory Wezterm terminal");
let terminal = wezterm_term::Terminal::new(
Self::wezterm_size(config.width.into(), config.height.into()),
std::sync::Arc::new(WeztermConfig {
scrollback: config.scrollback_size,
}),
"Tattoy",
"O_o",
Box::<Vec<u8>>::default(),
);
let pty_size = (config.width.into(), config.height.into());
Self {
terminal,
config,
channels: Channels {
control_tx,
output_tx,
output_rx,
internal_input_tx: None,
shadow_output,
},
accumulated_pty_output: Vec::new(),
wait_for_output_until: None,
scroll_position: 0,
last_sent: LastSent {
pty_sequence: 0,
pty_size,
},
}
}
#[inline]
pub fn start(
&mut self,
user_input_rx: tokio::sync::mpsc::Receiver<crate::pty::BytesFromSTDIN>,
) -> tokio::task::JoinHandle<Result<(), crate::errors::PTYError>> {
let (internal_input_tx, internal_input_rx) = tokio::sync::mpsc::channel(1);
self.channels.internal_input_tx = Some(internal_input_tx);
let pty = crate::pty::PTY {
command: self.config.command.clone(),
width: self.config.width,
height: self.config.height,
control_tx: self.channels.control_tx.clone(),
output_tx: self.channels.output_tx.clone(),
};
let current_span = tracing::Span::current();
tokio::spawn(async move {
pty.run(user_input_rx, internal_input_rx)
.instrument(current_span)
.await
})
}
#[inline]
pub async fn run(
&mut self,
user_input_rx: tokio::sync::mpsc::Receiver<crate::pty::BytesFromSTDIN>,
) {
tracing::debug!("Starting Shadow Terminal loop...");
let mut control_rx = self.channels.control_tx.subscribe();
self.start(user_input_rx);
tracing::debug!("Starting Shadow Terminal main loop");
#[expect(
clippy::integer_division_remainder_used,
reason = "`tokio::select!` generates this."
)]
loop {
let is_wait = self.wait_for_output_until.is_some();
let wait_until = self.wait_for_output_until;
tokio::select! {
Some(bytes) = self.channels.output_rx.recv() => {
self.accumulate_pty_output(&bytes);
},
() = Self::wait_for_more_pty_output(wait_until), if is_wait => {
let result = self.handle_pty_output().await;
if let Err(error) = result {
tracing::error!("Handling PTY output: {error:?}");
}
}
Ok(message) = control_rx.recv() => {
self.handle_protocol_message(&message).await;
if matches!(message, crate::Protocol::End) {
break;
}
}
}
}
tracing::debug!("Shadow Terminal loop finished");
}
async fn wait_for_more_pty_output(maybe_wait_until: Option<tokio::time::Instant>) {
if let Some(wait_until) = maybe_wait_until {
tokio::time::sleep_until(wait_until).await;
}
}
fn accumulate_pty_output(&mut self, bytes: &crate::pty::BytesFromPTY) {
for byte in bytes {
if byte == &0 {
break;
}
self.accumulated_pty_output.push(*byte);
}
let next_output_broadcast = tokio::time::Instant::now()
+ tokio::time::Duration::from_micros(TIME_TO_WAIT_FOR_MORE_PTY_OUTPUT);
self.wait_for_output_until = Some(next_output_broadcast);
}
fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
haystack
.windows(needle.len())
.position(|window| window == needle)
}
pub(crate) async fn handle_pty_output(
&mut self,
) -> Result<(), crate::errors::ShadowTerminalError> {
let bytes_copy = self.accumulated_pty_output.clone();
let bytes = bytes_copy.as_slice();
if Self::find_subsequence(bytes, APPLICATION_MODE_START.as_bytes()).is_some() {
tracing::trace!("Starting terminal 'application mode'");
crate::output::raw_string_direct_to_terminal(APPLICATION_MODE_START)
.with_whatever_context(|err| {
format!("Sending 'application mode start' ANSI code: {err:?}")
})?;
}
if Self::find_subsequence(bytes, APPLICATION_MODE_END.as_bytes()).is_some() {
tracing::trace!("APPLICATION_MODE_END");
crate::output::raw_string_direct_to_terminal(APPLICATION_MODE_END)
.with_whatever_context(|err| {
format!("Sending 'application mode end' ANSI code: {err:?}")
})?;
}
self.handle_cursor_position_request(bytes).await?;
self.terminal.advance_bytes(bytes);
tracing::trace!("Wezterm shadow terminal advanced {} bytes", bytes.len());
let result = self.send_outputs().await;
if let Err(error) = result {
tracing::error!("{error:?}");
}
self.accumulated_pty_output.clear();
self.wait_for_output_until = None;
Ok(())
}
#[expect(
clippy::needless_pass_by_ref_mut,
reason = "
When I set this to `&self` then we get an actual compiler error that the `send()` method
on the channel is not safe because it's not `Send`. I don't understand this.
"
)]
async fn handle_cursor_position_request(
&mut self,
bytes: &[u8],
) -> Result<(), crate::errors::ShadowTerminalError> {
if Self::find_subsequence(bytes, CURSOR_POSITION_REQUEST.as_bytes()).is_none() {
return Ok(());
}
let mut payload: crate::pty::BytesFromSTDIN = [0; 128];
let cursor_position = self.terminal.cursor_pos();
let response_string = format!("\x1b[{};{}R", cursor_position.y, cursor_position.x);
let response_bytes = response_string.as_bytes();
for chunk in response_bytes.chunks(128) {
crate::pty::PTY::add_bytes_to_buffer(&mut payload, chunk).with_whatever_context(
|error| format!("Couldn't add response to payload buffer: {error:?}"),
)?;
if let Some(sender) = self.channels.internal_input_tx.as_ref() {
tracing::debug!(
"Responding to cursor position request with: {}",
response_string.replace('\x1b', "^")
);
let result = sender.send(payload).await;
if let Err(error) = result {
snafu::whatever!("Couldn't send internal input: {error:?}");
}
}
}
Ok(())
}
async fn send_outputs(&mut self) -> Result<(), crate::errors::ShadowTerminalError> {
let screen_output = self.build_current_output(&crate::output::SurfaceKind::Screen)?;
self.send_output(screen_output).await?;
if !self.terminal.is_alt_screen_active() {
let scrollback_output =
self.build_current_output(&crate::output::SurfaceKind::Scrollback)?;
self.send_output(scrollback_output).await?;
}
self.last_sent = LastSent {
pty_sequence: self.terminal.current_seqno(),
pty_size: (self.terminal.get_size().cols, self.terminal.get_size().rows),
};
Ok(())
}
#[expect(
clippy::needless_pass_by_ref_mut,
reason = "
Weirdly, we get the following error when `mut` is not used:
rustc: future cannot be sent between threads safely
within `shadow_terminal::ShadowTerminal`, the trait `std::marker::Sync` is not implemented for `std::cell::RefCell<termwiz::escape::parser::ParseState>`
if you want to do aliasing and mutation between multiple threads, use `std::sync::RwLock` instead
"
)]
async fn send_output(
&mut self,
output: crate::output::Output,
) -> Result<(), crate::errors::ShadowTerminalError> {
let result = self.channels.shadow_output.send(output).await;
if let Err(error) = result {
tracing::error!("Sending shadow output: {error:?}");
return Ok(());
}
Ok(())
}
#[inline]
pub fn kill(&self) -> Result<(), crate::errors::ShadowTerminalError> {
tracing::debug!("`ShadowTerminal.kill()` called");
self.channels
.control_tx
.send(crate::Protocol::End)
.with_whatever_context(|err| {
format!("Couldn't write bytes into PTY's STDIN: {err:?}")
})?;
Ok(())
}
async fn handle_protocol_message(&mut self, message: &crate::Protocol) {
tracing::debug!("Shadow Terminal received protocol message: {message:?}");
#[expect(clippy::wildcard_enum_match_arm, reason = "It's our internal protocol")]
match message {
crate::Protocol::Resize { width, height } => {
self.terminal.resize(Self::wezterm_size(
usize::from(*width),
usize::from(*height),
));
tracing::trace!("Wezterm terminal resized to: {width}x{height}");
}
crate::Protocol::Scroll(scroll) => {
match scroll {
crate::Scroll::Up => {
let size = self.terminal.get_size();
let total_lines = self.terminal.screen().scrollback_rows() - size.rows;
self.scroll_position += self.config.scrollback_step;
self.scroll_position = self.scroll_position.min(total_lines);
}
crate::Scroll::Down => {
if self.scroll_position < self.config.scrollback_step {
self.scroll_position = 0;
} else {
self.scroll_position -= self.config.scrollback_step;
}
}
crate::Scroll::Cancel => {
self.scroll_position = 0;
}
}
let result = self.send_outputs().await;
if let Err(error) = result {
tracing::error!("Couldn't send PTY output from shadow terminal: {error:?}");
}
}
_ => (),
}
}
const fn wezterm_size(width: usize, height: usize) -> wezterm_term::TerminalSize {
wezterm_term::TerminalSize {
cols: width,
rows: height,
pixel_width: 0,
pixel_height: 0,
dpi: 0,
}
}
#[inline]
pub fn resize(
&mut self,
width: u16,
height: u16,
) -> Result<(), tokio::sync::broadcast::error::SendError<crate::Protocol>> {
self.channels
.control_tx
.send(crate::Protocol::Resize { width, height })?;
self.terminal
.resize(Self::wezterm_size(width.into(), height.into()));
Ok(())
}
}
impl Drop for ShadowTerminal {
#[inline]
fn drop(&mut self) {
tracing::trace!("Running ShadowTerminal.drop()");
let result = self.kill();
if let Err(error) = result {
tracing::error!("{error:?}");
}
}
}