tastty 0.1.0

Embeddable pseudoterminal sessions for Rust applications
//! Reader/writer threads, [`WriterHandle`], and the channel-consumer
//! accessors on [`Terminal`].

use std::fmt;
use std::io::{Read, Write};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

use bytes::Bytes;
use tastty_core::host_reply::{HostQuery, ReplyAction, auto_reply_bytes_for_query};
use tastty_core::{HostProfile, Parser, ScreenEvent, TerminalMode};
use tokio::sync::mpsc;
use tracing::warn;

use crate::error::{
    Error, IoError, IoErrorReceiver, ReaderError, Result, WriterError, WriterOperation,
};
use crate::osc_policy::{ClipboardPolicy, OscPolicy};

use super::Terminal;
use super::TerminalOwnership;
use super::options::{HostQueryCallback, InputCallback, OutputCallback, RedrawCallback};
use super::spawn_named;

pub(crate) const SYNC_TIMEOUT_MS: u64 = 200;
pub(crate) const WRITER_CHANNEL_CAPACITY: usize = 1024;
pub(crate) const PTY_READ_BUF_SIZE: usize = 8192;
pub(crate) const JOIN_TIMEOUT: Duration = Duration::from_millis(50);

/// Cheap cloneable handle that lets a caller send bytes to the child
/// without holding an outer lock around the [`Terminal`] across the
/// `.await`.
///
/// Obtained from [`Terminal::writer`]. A single [`WriterHandle::send`]
/// call queues one frame as a single mpsc message, so concurrent
/// senders cannot interleave bytes within a single frame; the bytes
/// between separate calls may interleave in any order.
///
/// Sends are cancellation-safe: dropping the future before completion
/// neither queues nor drops the bytes, matching the guarantee on
/// `tokio::sync::mpsc::Sender::send`.
#[derive(Clone)]
pub struct WriterHandle {
    tx: mpsc::Sender<Bytes>,
}

impl fmt::Debug for WriterHandle {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("WriterHandle").finish_non_exhaustive()
    }
}

impl WriterHandle {
    pub(crate) fn new(tx: mpsc::Sender<Bytes>) -> Self {
        Self { tx }
    }

    /// Send bytes to the child, awaiting if the bounded writer queue is
    /// full.
    ///
    /// Cancellation-safe; backed by `tokio::sync::mpsc::Sender::send`.
    ///
    /// # Errors
    ///
    /// Returns [`Error::SendClosed`] when the writer side is gone.
    pub async fn send(&self, bytes: &[u8]) -> Result<()> {
        self.tx
            .send(Bytes::copy_from_slice(bytes))
            .await
            .map_err(|_err| Error::SendClosed)
    }
}

impl<M: TerminalOwnership> Terminal<M> {
    /// Returns the next pending screen event, if any.
    /// Events include clipboard operations (OSC 52) from inner applications
    /// and remain queued until drained by the caller.
    #[must_use]
    pub fn try_recv_event(&self) -> Option<ScreenEvent> {
        self.event_rx
            .lock()
            .unwrap_or_else(|e| e.into_inner())
            .as_mut()?
            .try_recv()
            .ok()
    }

    /// Takes ownership of the event receiver, returning it to the caller.
    ///
    /// Returns `None` if the receiver was already taken by a previous call.
    ///
    /// The caller can drive the receiver directly:
    /// - `receiver.recv().await` for async waiting
    /// - `receiver.try_recv()` for non-blocking polling
    /// - Wrap in `tokio_stream::wrappers::UnboundedReceiverStream` for a `Stream`
    ///
    /// After this call, [`try_recv_event`](Self::try_recv_event) will always
    /// return `None`.
    pub fn take_event_receiver(&self) -> Option<tokio::sync::mpsc::UnboundedReceiver<ScreenEvent>> {
        self.event_rx
            .lock()
            .unwrap_or_else(|e| e.into_inner())
            .take()
    }

    /// Returns a fatal reader- or writer-thread error if one has been observed.
    ///
    /// Each I/O thread emits at most one [`IoError`] before exiting. Reader
    /// EOF (the child closing the slave PTY) is silent and produces no value
    /// here.
    ///
    /// Returns `None` while I/O is healthy or after the child has exited
    /// cleanly. Returns `None` if
    /// [`take_io_error_receiver`](Self::take_io_error_receiver) has already
    /// moved the receiver out.
    #[must_use]
    pub fn try_recv_io_error(&self) -> Option<IoError> {
        self.io_error_rx
            .lock()
            .unwrap_or_else(|e| e.into_inner())
            .as_mut()?
            .try_recv()
            .ok()
    }

    /// Takes ownership of the I/O-error receiver, returning it to the caller.
    ///
    /// Returns `None` if the receiver was already taken by a previous call.
    ///
    /// The caller can drive the receiver directly:
    /// - `receiver.recv().await` for async waiting
    /// - `receiver.try_recv()` for non-blocking polling
    ///
    /// Each I/O thread emits at most one item before exiting. Reader EOF
    /// remains silent. After this call,
    /// [`try_recv_io_error`](Self::try_recv_io_error) will always return
    /// `None`.
    pub fn take_io_error_receiver(&self) -> Option<IoErrorReceiver> {
        self.io_error_rx
            .lock()
            .unwrap_or_else(|e| e.into_inner())
            .take()
    }

    /// Returns a Notify that fires when new output arrives from the PTY.
    /// Use with `notifier.notified().await` in async event loops.
    #[must_use]
    pub fn redraw_notifier(&self) -> Arc<tokio::sync::Notify> {
        Arc::clone(&self.redraw_notify)
    }

    /// Returns `true` if new output has arrived and the session is not
    /// mid-batch (or the sync timeout has expired). Clears the dirty flag.
    #[must_use]
    pub fn take_redraw(&self) -> bool {
        if self.parser_read().screen().mode(TerminalMode::SyncUpdate) {
            let arrival_ms = self.last_sync_arrival_ms.load(Ordering::Acquire);
            if arrival_ms == 0 {
                return false;
            }
            let now_ms = self.epoch.elapsed().as_millis() as u64;
            if now_ms.saturating_sub(arrival_ms) < SYNC_TIMEOUT_MS {
                return false;
            }
        }
        self.dirty.swap(false, Ordering::AcqRel)
    }
}

pub(super) struct ReaderContext {
    pub(super) reader: Box<dyn Read + Send>,
    pub(super) parser: Arc<RwLock<Parser>>,
    /// Captured at spawn so the reader thread can compute auto-reply bytes
    /// without holding the parser lock. `HostProfile` is built once in
    /// [`build_common`](super::build_common) and shared via `Arc`; nothing
    /// in this crate or `tastty-core` mutates it after construction.
    pub(super) host_profile: Arc<HostProfile>,
    pub(super) response_tx: mpsc::Sender<Bytes>,
    pub(super) dirty: Arc<AtomicBool>,
    pub(super) redraw_notify: Arc<tokio::sync::Notify>,
    pub(super) last_sync_arrival_ms: Arc<AtomicU64>,
    pub(super) epoch: Instant,
    pub(super) event_tx: mpsc::UnboundedSender<ScreenEvent>,
    pub(super) io_error_tx: mpsc::UnboundedSender<IoError>,
    pub(super) output_callback: Option<OutputCallback>,
    pub(super) redraw_callback: Option<RedrawCallback>,
    pub(super) host_query_callback: HostQueryCallback,
    pub(super) clipboard_policy: ClipboardPolicy,
    /// Held for the lifetime of the reader thread; drops when the thread
    /// exits (clean EOF, fatal read error, or panic). The managed-spawn
    /// child waiter blocks on the matching receiver before publishing
    /// the exit status, so [`try_wait`](Terminal::try_wait) cannot
    /// surface `Some(status)` while bytes are still in flight to the
    /// parser. `None` for the unmanaged path, which has no waiter and
    /// no caller relying on this ordering.
    pub(super) reader_done: Option<std::sync::mpsc::Sender<()>>,
}

/// Apply [`ClipboardPolicy`] to one drained [`ScreenEvent`].
///
/// Returns `true` if the event is allowed to flow to both
/// `auto_reply_bytes` and the embedder-facing event channel. Returns
/// `false` only for OSC 52 events whose target list contains at least
/// one denied buffer; non-clipboard events always pass.
///
/// Multi-target events are denied if any one target is denied, since
/// the OSC 52 wire form treats the target list as a single unit and
/// partial honor (writing to some buffers but not others) would
/// surprise both the program issuing the request and the embedder
/// auditing the policy.
fn clipboard_policy_allows(event: &ScreenEvent, policy: &ClipboardPolicy) -> bool {
    let (targets, direction) = match event {
        ScreenEvent::ClipboardQuery { targets } => (targets, &policy.read),
        ScreenEvent::ClipboardWrite { targets, .. } | ScreenEvent::ClipboardClear { targets } => {
            (targets, &policy.write)
        }
        _ => return true,
    };
    targets
        .iter()
        .all(|target| matches!(direction.for_target(*target), OscPolicy::Allow))
}

pub(super) const READER_THREAD_NAME: &str = "tastty-reader";
pub(super) const WRITER_THREAD_NAME: &str = "tastty-writer";

pub(super) fn spawn_reader(ctx: ReaderContext) -> Result<std::thread::JoinHandle<()>> {
    let ReaderContext {
        mut reader,
        parser,
        host_profile,
        response_tx,
        dirty,
        redraw_notify,
        last_sync_arrival_ms,
        epoch,
        event_tx,
        io_error_tx,
        output_callback,
        redraw_callback,
        host_query_callback,
        clipboard_policy,
        reader_done,
    } = ctx;
    spawn_named(READER_THREAD_NAME, move || {
        let _reader_done = reader_done;
        let mut buf = [0u8; PTY_READ_BUF_SIZE];
        loop {
            match reader.read(&mut buf) {
                // EOF: the slave side of the PTY closed. portable_pty
                // translates the Linux EIO-on-hangup into Ok(0) for us, so
                // this is the single canonical exit path for clean child
                // termination. Stay silent: embedders that already wait on
                // child exit do not need a second notification.
                Ok(0) => break,
                Ok(n) => {
                    if let Some(ref cb) = output_callback {
                        cb(&buf[..n]);
                    }
                    let (sync, mut events) = {
                        let mut p = parser.write().unwrap_or_else(|e| e.into_inner());
                        p.process(&buf[..n]);
                        let sync = p.screen().mode(TerminalMode::SyncUpdate);
                        let events = p.screen_mut().drain_events();
                        (sync, events)
                    };
                    events.retain(|event| clipboard_policy_allows(event, &clipboard_policy));
                    // Order: send all auto-replies first, then surface
                    // events to the embedder. The auto_response.rs
                    // integration test pins this: the guest program's
                    // `read -t 2` after a query must observe the reply
                    // before any subsequent events surface.
                    let resp: Vec<u8> = events
                        .iter()
                        .filter_map(HostQuery::from_event)
                        .filter_map(|q| match host_query_callback(&q, &host_profile) {
                            ReplyAction::Send => {
                                Some(auto_reply_bytes_for_query(&q, &host_profile))
                            }
                            ReplyAction::Replace(reply) => Some(reply.encode()),
                            ReplyAction::Drop => None,
                            _ => None,
                        })
                        .flatten()
                        .collect();

                    if let Some(ref cb) = redraw_callback {
                        cb();
                    }
                    dirty.store(true, Ordering::Release);
                    redraw_notify.notify_one();
                    if sync {
                        let ms = epoch.elapsed().as_millis() as u64;
                        last_sync_arrival_ms.store(ms, Ordering::Release);
                    } else {
                        last_sync_arrival_ms.store(0, Ordering::Release);
                    }

                    if !resp.is_empty() && response_tx.blocking_send(Bytes::from(resp)).is_err() {
                        warn!("dropping parser response because writer thread is gone");
                        break;
                    }
                    for event in events {
                        if event_tx.send(event).is_err() {
                            warn!("dropping screen event because receiver is gone");
                            break;
                        }
                    }
                }
                // EINTR / spurious nonblocking wake: retry the same read.
                // portable_pty does not loop on Interrupted internally, and
                // a future caller might flip O_NONBLOCK on the master fd,
                // so handle WouldBlock defensively too.
                Err(e)
                    if e.kind() == std::io::ErrorKind::Interrupted
                        || e.kind() == std::io::ErrorKind::WouldBlock => {}
                Err(e) => {
                    warn!(
                        kind = ?e.kind(),
                        error = %e,
                        "PTY reader thread exiting on read error",
                    );
                    let kind = e.kind();
                    drop(io_error_tx.send(IoError::Reader(ReaderError { kind, source: e })));
                    break;
                }
            }
        }
    })
}

pub(super) fn spawn_writer(
    mut writer: Box<dyn Write + Send>,
    input_callback: Option<InputCallback>,
    io_error_tx: mpsc::UnboundedSender<IoError>,
) -> Result<(mpsc::Sender<Bytes>, std::thread::JoinHandle<()>)> {
    let (tx, mut rx) = mpsc::channel::<Bytes>(WRITER_CHANNEL_CAPACITY);
    let handle = spawn_named(WRITER_THREAD_NAME, move || {
        while let Some(data) = rx.blocking_recv() {
            if let Err(source) = writer.write_all(&data) {
                let kind = source.kind();
                drop(io_error_tx.send(IoError::Writer(WriterError {
                    operation: WriterOperation::Write,
                    kind,
                    source,
                })));
                break;
            }
            if let Err(source) = writer.flush() {
                let kind = source.kind();
                drop(io_error_tx.send(IoError::Writer(WriterError {
                    operation: WriterOperation::Flush,
                    kind,
                    source,
                })));
                break;
            }
            if let Some(ref cb) = input_callback {
                cb(&data);
            }
        }
    })?;
    Ok((tx, handle))
}

pub(super) fn join_with_timeout(handle: Option<std::thread::JoinHandle<()>>, timeout: Duration) {
    let Some(handle) = handle else { return };

    let deadline = Instant::now() + timeout;
    let mut backoff = Duration::from_micros(50);
    let max_backoff = Duration::from_millis(2);
    while !handle.is_finished() {
        let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
            // Intentional detach: the worker is parked in an OS read or
            // channel recv that Drop cannot interrupt; the PTY master
            // close (after this Drop returns) unblocks it.
            warn!(
                timeout_ms = timeout.as_millis(),
                "thread join timed out; detaching"
            );
            return;
        };
        std::thread::sleep(backoff.min(remaining));
        backoff = (backoff * 2).min(max_backoff);
    }
    drop(handle.join());
}