use std::fmt;
use std::io::{Read, Write};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use bytes::Bytes;
use tastty_core::host_reply::{HostQuery, ReplyAction, auto_reply_bytes_for_query};
use tastty_core::{HostProfile, Parser, ScreenEvent, TerminalMode};
use tokio::sync::mpsc;
use tracing::warn;
use crate::error::{
Error, IoError, IoErrorReceiver, ReaderError, Result, WriterError, WriterOperation,
};
use crate::osc_policy::{ClipboardPolicy, OscPolicy};
use super::Terminal;
use super::TerminalOwnership;
use super::options::{HostQueryCallback, InputCallback, OutputCallback, RedrawCallback};
use super::spawn_named;
pub(crate) const SYNC_TIMEOUT_MS: u64 = 200;
pub(crate) const WRITER_CHANNEL_CAPACITY: usize = 1024;
pub(crate) const PTY_READ_BUF_SIZE: usize = 8192;
pub(crate) const JOIN_TIMEOUT: Duration = Duration::from_millis(50);
#[derive(Clone)]
pub struct WriterHandle {
tx: mpsc::Sender<Bytes>,
}
impl fmt::Debug for WriterHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WriterHandle").finish_non_exhaustive()
}
}
impl WriterHandle {
pub(crate) fn new(tx: mpsc::Sender<Bytes>) -> Self {
Self { tx }
}
pub async fn send(&self, bytes: &[u8]) -> Result<()> {
self.tx
.send(Bytes::copy_from_slice(bytes))
.await
.map_err(|_err| Error::SendClosed)
}
}
impl<M: TerminalOwnership> Terminal<M> {
#[must_use]
pub fn try_recv_event(&self) -> Option<ScreenEvent> {
self.event_rx
.lock()
.unwrap_or_else(|e| e.into_inner())
.as_mut()?
.try_recv()
.ok()
}
pub fn take_event_receiver(&self) -> Option<tokio::sync::mpsc::UnboundedReceiver<ScreenEvent>> {
self.event_rx
.lock()
.unwrap_or_else(|e| e.into_inner())
.take()
}
#[must_use]
pub fn try_recv_io_error(&self) -> Option<IoError> {
self.io_error_rx
.lock()
.unwrap_or_else(|e| e.into_inner())
.as_mut()?
.try_recv()
.ok()
}
pub fn take_io_error_receiver(&self) -> Option<IoErrorReceiver> {
self.io_error_rx
.lock()
.unwrap_or_else(|e| e.into_inner())
.take()
}
#[must_use]
pub fn redraw_notifier(&self) -> Arc<tokio::sync::Notify> {
Arc::clone(&self.redraw_notify)
}
#[must_use]
pub fn take_redraw(&self) -> bool {
if self.parser_read().screen().mode(TerminalMode::SyncUpdate) {
let arrival_ms = self.last_sync_arrival_ms.load(Ordering::Acquire);
if arrival_ms == 0 {
return false;
}
let now_ms = self.epoch.elapsed().as_millis() as u64;
if now_ms.saturating_sub(arrival_ms) < SYNC_TIMEOUT_MS {
return false;
}
}
self.dirty.swap(false, Ordering::AcqRel)
}
}
pub(super) struct ReaderContext {
pub(super) reader: Box<dyn Read + Send>,
pub(super) parser: Arc<RwLock<Parser>>,
pub(super) host_profile: Arc<HostProfile>,
pub(super) response_tx: mpsc::Sender<Bytes>,
pub(super) dirty: Arc<AtomicBool>,
pub(super) redraw_notify: Arc<tokio::sync::Notify>,
pub(super) last_sync_arrival_ms: Arc<AtomicU64>,
pub(super) epoch: Instant,
pub(super) event_tx: mpsc::UnboundedSender<ScreenEvent>,
pub(super) io_error_tx: mpsc::UnboundedSender<IoError>,
pub(super) output_callback: Option<OutputCallback>,
pub(super) redraw_callback: Option<RedrawCallback>,
pub(super) host_query_callback: HostQueryCallback,
pub(super) clipboard_policy: ClipboardPolicy,
pub(super) reader_done: Option<std::sync::mpsc::Sender<()>>,
}
fn clipboard_policy_allows(event: &ScreenEvent, policy: &ClipboardPolicy) -> bool {
let (targets, direction) = match event {
ScreenEvent::ClipboardQuery { targets } => (targets, &policy.read),
ScreenEvent::ClipboardWrite { targets, .. } | ScreenEvent::ClipboardClear { targets } => {
(targets, &policy.write)
}
_ => return true,
};
targets
.iter()
.all(|target| matches!(direction.for_target(*target), OscPolicy::Allow))
}
pub(super) const READER_THREAD_NAME: &str = "tastty-reader";
pub(super) const WRITER_THREAD_NAME: &str = "tastty-writer";
pub(super) fn spawn_reader(ctx: ReaderContext) -> Result<std::thread::JoinHandle<()>> {
let ReaderContext {
mut reader,
parser,
host_profile,
response_tx,
dirty,
redraw_notify,
last_sync_arrival_ms,
epoch,
event_tx,
io_error_tx,
output_callback,
redraw_callback,
host_query_callback,
clipboard_policy,
reader_done,
} = ctx;
spawn_named(READER_THREAD_NAME, move || {
let _reader_done = reader_done;
let mut buf = [0u8; PTY_READ_BUF_SIZE];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
if let Some(ref cb) = output_callback {
cb(&buf[..n]);
}
let (sync, mut events) = {
let mut p = parser.write().unwrap_or_else(|e| e.into_inner());
p.process(&buf[..n]);
let sync = p.screen().mode(TerminalMode::SyncUpdate);
let events = p.screen_mut().drain_events();
(sync, events)
};
events.retain(|event| clipboard_policy_allows(event, &clipboard_policy));
let resp: Vec<u8> = events
.iter()
.filter_map(HostQuery::from_event)
.filter_map(|q| match host_query_callback(&q, &host_profile) {
ReplyAction::Send => {
Some(auto_reply_bytes_for_query(&q, &host_profile))
}
ReplyAction::Replace(reply) => Some(reply.encode()),
ReplyAction::Drop => None,
_ => None,
})
.flatten()
.collect();
if let Some(ref cb) = redraw_callback {
cb();
}
dirty.store(true, Ordering::Release);
redraw_notify.notify_one();
if sync {
let ms = epoch.elapsed().as_millis() as u64;
last_sync_arrival_ms.store(ms, Ordering::Release);
} else {
last_sync_arrival_ms.store(0, Ordering::Release);
}
if !resp.is_empty() && response_tx.blocking_send(Bytes::from(resp)).is_err() {
warn!("dropping parser response because writer thread is gone");
break;
}
for event in events {
if event_tx.send(event).is_err() {
warn!("dropping screen event because receiver is gone");
break;
}
}
}
Err(e)
if e.kind() == std::io::ErrorKind::Interrupted
|| e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(e) => {
warn!(
kind = ?e.kind(),
error = %e,
"PTY reader thread exiting on read error",
);
let kind = e.kind();
drop(io_error_tx.send(IoError::Reader(ReaderError { kind, source: e })));
break;
}
}
}
})
}
pub(super) fn spawn_writer(
mut writer: Box<dyn Write + Send>,
input_callback: Option<InputCallback>,
io_error_tx: mpsc::UnboundedSender<IoError>,
) -> Result<(mpsc::Sender<Bytes>, std::thread::JoinHandle<()>)> {
let (tx, mut rx) = mpsc::channel::<Bytes>(WRITER_CHANNEL_CAPACITY);
let handle = spawn_named(WRITER_THREAD_NAME, move || {
while let Some(data) = rx.blocking_recv() {
if let Err(source) = writer.write_all(&data) {
let kind = source.kind();
drop(io_error_tx.send(IoError::Writer(WriterError {
operation: WriterOperation::Write,
kind,
source,
})));
break;
}
if let Err(source) = writer.flush() {
let kind = source.kind();
drop(io_error_tx.send(IoError::Writer(WriterError {
operation: WriterOperation::Flush,
kind,
source,
})));
break;
}
if let Some(ref cb) = input_callback {
cb(&data);
}
}
})?;
Ok((tx, handle))
}
pub(super) fn join_with_timeout(handle: Option<std::thread::JoinHandle<()>>, timeout: Duration) {
let Some(handle) = handle else { return };
let deadline = Instant::now() + timeout;
let mut backoff = Duration::from_micros(50);
let max_backoff = Duration::from_millis(2);
while !handle.is_finished() {
let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
warn!(
timeout_ms = timeout.as_millis(),
"thread join timed out; detaching"
);
return;
};
std::thread::sleep(backoff.min(remaining));
backoff = (backoff * 2).min(max_backoff);
}
drop(handle.join());
}