tastty 0.1.0

Embeddable pseudoterminal sessions for Rust applications
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
//! Managed-session paths and the [`ExitChannel`] joining async and
//! synchronous exit waiters.

use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;

use portable_pty::{ChildKiller, MasterPty, PtySize, native_pty_system};
use tastty_core::{TerminalMode, TerminalSize};
use tokio::sync::Notify;
use tracing::{debug, info, info_span};

use crate::Builder;
use crate::error::{Error, Result};
use crate::exit_status::ExitStatus;
use crate::process_group;

use super::Managed;
use super::Terminal;
use super::build_common;
use super::spawn_named;

const CHILD_WAITER_THREAD_NAME: &str = "tastty-child-waiter";

pub(super) const SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(100);

pub(crate) struct ManagedState {
    /// Wrapped in a [`Mutex`] so `Terminal: Sync`: `portable_pty`'s
    /// `MasterPty` trait object is only bound `+ Send`, never `+ Sync`,
    /// so a bare `Box<dyn MasterPty + Send>` would block `&Terminal`
    /// from being shared across threads. The lock is only ever taken
    /// for the rare [`resize`](Terminal::resize) ioctl call; nothing on
    /// the hot path touches it.
    pub(super) pty_master: Mutex<Box<dyn MasterPty + Send>>,
    pub(super) child_pid: Option<u32>,
    /// Cloned from the child before the waiter thread takes ownership, since
    /// termination needs to reach the child while the waiter is blocked in
    /// `wait()`. Stored uniformly across platforms (Unix still prefers the
    /// group-kill path) because the `process_group` signatures are shared.
    pub(super) killer: std::sync::Mutex<Box<dyn ChildKiller + Send + Sync>>,
    /// See [`ExitChannel`].
    pub(super) exit_chan: Arc<ExitChannel>,
}

/// Single source of truth for "child has exited" with mixed async and
/// sync waiters. The waiter thread publishes once; both wakers fire
/// from the same lock-protected state transition. The `Notify` wakes
/// [`Terminal::wait_async`] cancellation-safely; the [`Condvar`] wakes
/// the synchronous SIGTERM-grace deadline loop without requiring a
/// tokio runtime in [`Drop`].
pub(super) struct ExitChannel {
    state: Mutex<ExitState>,
    sync_cvar: Condvar,
    async_notify: Notify,
}

#[derive(Clone)]
pub(super) enum ExitState {
    /// Child still running, or waiter thread has not yet observed exit.
    Pending,
    /// Waiter thread observed exit and the reader thread has finished
    /// draining the PTY master.
    Exited(ExitStatus),
    /// Waiter thread terminated without publishing an exit status. Today
    /// this surfaces only when `child.wait()` returns an error or the
    /// waiter thread panics.
    Unavailable,
}

impl ExitChannel {
    fn new() -> Self {
        Self {
            state: Mutex::new(ExitState::Pending),
            sync_cvar: Condvar::new(),
            async_notify: Notify::new(),
        }
    }

    fn publish(&self, new: ExitState) {
        {
            let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
            *g = new;
        }
        self.sync_cvar.notify_all();
        self.async_notify.notify_waiters();
    }

    /// Used by the waiter thread's drop guard so a panic or early return
    /// surfaces as `Unavailable` instead of leaving sync waiters stuck on
    /// the cvar until their deadline.
    fn publish_if_pending(&self, new: ExitState) {
        let mut g = self.state.lock().unwrap_or_else(|e| e.into_inner());
        if matches!(*g, ExitState::Pending) {
            *g = new;
            drop(g);
            self.sync_cvar.notify_all();
            self.async_notify.notify_waiters();
        }
    }

    fn snapshot(&self) -> ExitState {
        self.state.lock().unwrap_or_else(|e| e.into_inner()).clone()
    }
}

impl Terminal<Managed> {
    /// Spawn the [`Builder`]'s configured command inside a new PTY and
    /// return a managed session.
    ///
    /// # Errors
    ///
    /// - [`Error::MissingCommand`] if the [`Builder`] has no program set.
    /// - [`Error::OpenPtyFailed`], [`Error::SpawnCommandFailed`],
    ///   [`Error::CloneReaderFailed`], or [`Error::TakeWriterFailed`]
    ///   from the PTY allocation and child-attach steps.
    pub fn spawn(builder: Builder) -> Result<Self> {
        let (cmd, mut opts) = builder.into_parts()?;
        let pty_size = opts.pty_size();
        let span = info_span!(
            "terminal_session_spawn",
            rows = pty_size.rows,
            cols = pty_size.cols,
            pixel_width = pty_size.pixel_width,
            pixel_height = pty_size.pixel_height,
        );
        let _guard = span.enter();
        info!("tastty spawn begin");
        let pty_system = native_pty_system();
        debug!("calling native_pty_system.openpty");
        let pair = pty_system
            .openpty(pty_size)
            .map_err(|source| Error::OpenPtyFailed {
                source: source.into(),
            })?;
        debug!("openpty succeeded");
        #[cfg(unix)]
        if !opts.echo {
            debug!("disabling PTY echo");
            disable_echo(&*pair.master)?;
            debug!("PTY echo disabled");
        }

        debug!("calling slave.spawn_command");
        let mut child =
            pair.slave
                .spawn_command(cmd)
                .map_err(|source| Error::SpawnCommandFailed {
                    source: source.into(),
                })?;
        debug!("spawn_command succeeded");

        let child_pid = child.process_id();
        info!(child_pid = child_pid.unwrap_or(0), "child process spawned");

        let killer = child.clone_killer();

        let exit_chan = Arc::new(ExitChannel::new());
        let exit_chan_for_waiter = Arc::clone(&exit_chan);
        // Held by the reader thread, dropped on exit. The waiter below
        // blocks on `recv()` (which returns `Err` once the sender is
        // gone) before publishing the exit status. Without this, a
        // caller polling `try_wait` can observe `Some(status)` while
        // bytes the child wrote just before exiting are still queued
        // for the parser, causing wait conditions in higher layers to
        // surface ProcessExitedBeforeMatch for output that is about to
        // materialize on the screen.
        let (reader_done_tx, reader_done_rx) = std::sync::mpsc::channel::<()>();
        let _waiter = spawn_named(CHILD_WAITER_THREAD_NAME, move || {
            // Drop guard so a panic or `child.wait()` error still
            // wakes synchronous deadline waiters with `Unavailable`
            // instead of leaving them parked until their timeout.
            struct PublishOnDrop(Arc<ExitChannel>);
            impl Drop for PublishOnDrop {
                fn drop(&mut self) {
                    self.0.publish_if_pending(ExitState::Unavailable);
                }
            }
            let guard = PublishOnDrop(exit_chan_for_waiter);
            if let Ok(status) = child.wait() {
                let _done = reader_done_rx.recv();
                guard
                    .0
                    .publish(ExitState::Exited(ExitStatus::from_portable(status)));
            }
        })?;

        debug!("cloning PTY reader");
        let reader = pair
            .master
            .try_clone_reader()
            .map_err(|source| Error::CloneReaderFailed {
                source: source.into(),
            })?;
        debug!("PTY reader clone succeeded");

        debug!("taking PTY writer");
        let writer = pair
            .master
            .take_writer()
            .map_err(|source| Error::TakeWriterFailed {
                source: source.into(),
            })?;
        debug!("PTY writer acquired");

        debug!("building terminal session");
        let common = build_common(reader, writer, &mut opts, Some(reader_done_tx))?;

        info!(child_pid = child_pid.unwrap_or(0), "tastty spawn complete");
        Ok(Self {
            parser: common.parser,
            writer_tx: Some(common.writer_tx),
            managed: Some(ManagedState {
                pty_master: Mutex::new(pair.master),
                child_pid,
                killer: std::sync::Mutex::new(killer),
                exit_chan,
            }),
            reader_handle: Some(common.reader_handle),
            writer_handle: Some(common.writer_handle),
            dirty: common.dirty,
            redraw_notify: common.redraw_notify,
            event_rx: std::sync::Mutex::new(Some(common.event_rx)),
            io_error_rx: std::sync::Mutex::new(Some(common.io_error_rx)),
            last_sync_arrival_ms: common.last_sync_arrival_ms,
            epoch: common.epoch,
            virtual_cols: common.virtual_cols,
            key_callback: common.key_callback,
            _mode: std::marker::PhantomData,
        })
    }

    fn managed_state(&self) -> &ManagedState {
        self.managed
            .as_ref()
            .expect("Terminal<Managed> always carries managed state")
    }

    /// Resize the PTY and virtual screen.
    ///
    /// [`AbsolutePosition`](tastty_core::AbsolutePosition) handles held
    /// by callers are not remapped across this call: a row that
    /// previously sat at one absolute index may move to a different one
    /// once the virtual screen re-buckets its buffer, so callers that
    /// hold absolute positions (selection anchors, search hits,
    /// scrollback bookmarks) must re-resolve them after the call returns.
    ///
    /// # Errors
    ///
    /// Returns [`Error::InvalidResize`] when either dimension is zero, or
    /// [`Error::ResizeFailed`] if the underlying PTY resize fails. If the
    /// child requested in-band resize reports, this can also return the same
    /// errors as [`send`](Self::send).
    pub fn resize(&self, size: TerminalSize) -> Result<()> {
        let TerminalSize { rows, cols } = size;
        if cols == 0 || rows == 0 {
            return Err(Error::InvalidResize { rows, cols });
        }
        let (pixel_cell, in_band) = {
            let parser = self.parser_read();
            (
                parser.screen().pixel_cell_size(),
                parser.screen().mode(TerminalMode::InBandResize),
            )
        };
        let pixel_width = cols.saturating_mul(pixel_cell.width);
        let pixel_height = rows.saturating_mul(pixel_cell.height);
        let pty_size = PtySize {
            rows,
            cols,
            pixel_width,
            pixel_height,
        };
        self.managed_state()
            .pty_master
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner)
            .resize(pty_size)
            .map_err(|source| Error::ResizeFailed {
                rows,
                cols,
                source: source.into(),
            })?;
        let parser_cols = self.virtual_cols.unwrap_or(cols);
        {
            let mut parser = self.parser_write();
            parser.screen_mut().set_size(TerminalSize {
                rows,
                cols: parser_cols,
            });
        }
        if in_band {
            let resp = format!("\x1b[48;{rows};{cols};{pixel_height};{pixel_width}t");
            self.send(resp.as_bytes())?;
        }
        Ok(())
    }

    /// Poll the managed child process without blocking.
    ///
    /// Returns `Ok(None)` while the child is still running, or while the
    /// reader thread is still draining bytes the child wrote before
    /// exiting. The transition to `Ok(Some(status))` is published only
    /// after the reader has observed EOF on the PTY master, so callers
    /// can rely on every byte the child sent being parsed and reflected
    /// in [`with_screen`](Self::with_screen) before the exit becomes
    /// observable here.
    ///
    /// # Errors
    ///
    /// Returns [`Error::ExitStatusUnavailable`] if the waiter thread
    /// terminated without observing an exit status (e.g. `child.wait()`
    /// itself returned an error).
    pub fn try_wait(&self) -> Result<Option<ExitStatus>> {
        try_wait_inner(self.managed_state())
    }

    /// Wait asynchronously for the managed child process to exit.
    ///
    /// Resolves only after the reader thread has finished draining the
    /// PTY master, so a caller that awaits this and then snapshots the
    /// screen is guaranteed to see every byte the child emitted before
    /// exiting. See [`try_wait`](Self::try_wait) for the same ordering
    /// guarantee on the polling shape.
    ///
    /// # Cancellation safety
    ///
    /// Cancellation-safe; dropping the returned future only deregisters
    /// from the inner [`Notify`]. Safe to use as a `tokio::select!` arm
    /// and from multiple awaiters concurrently. Each call observes the
    /// same exit status.
    ///
    /// # Errors
    ///
    /// Returns [`Error::ExitStatusUnavailable`] if the waiter thread
    /// terminated without observing an exit status (e.g. `child.wait()`
    /// itself returned an error).
    pub async fn wait_async(&self) -> Result<ExitStatus> {
        let chan = &self.managed_state().exit_chan;
        loop {
            // Subscribe to the next wakeup before re-reading state so a
            // publication that races with this poll cannot be missed.
            let notified = chan.async_notify.notified();
            tokio::pin!(notified);
            notified.as_mut().enable();
            match chan.snapshot() {
                ExitState::Exited(status) => return Ok(status),
                ExitState::Unavailable => return Err(Error::ExitStatusUnavailable),
                ExitState::Pending => {}
            }
            notified.await;
        }
    }

    /// Return whether the managed child process has exited.
    #[must_use]
    pub fn is_finished(&self) -> bool {
        !matches!(
            self.managed_state().exit_chan.snapshot(),
            ExitState::Pending
        )
    }

    /// Send SIGTERM (then SIGKILL if needed) to the child process group
    /// without destroying the session. Screen state remains accessible
    /// after the process is killed.
    ///
    /// The grace wait before escalating to SIGKILL is capped at 100 ms but
    /// wakes early on actual child exit, so a cooperating child typically
    /// costs only a few ms; the full 100 ms is paid only when the child
    /// traps SIGTERM and runs cleanup up to the deadline.
    ///
    /// # Errors
    ///
    /// Returns [`Error::TerminateFailed`] if SIGTERM cannot be sent, or
    /// [`Error::ForceKillFailed`] if SIGKILL escalation fails.
    pub fn kill(&self) -> Result<()> {
        if self.is_finished() {
            return Ok(());
        }

        let managed = self.managed_state();
        let Some(pid) = managed.child_pid else {
            return Ok(());
        };

        {
            let mut killer = managed.killer.lock().unwrap_or_else(|e| e.into_inner());
            process_group::sigterm_group(pid, killer.as_mut())
                .map_err(|source| Error::TerminateFailed { source })?;
        }

        wait_for_exit_or_deadline(managed, SHUTDOWN_TIMEOUT);

        if !self.is_finished() {
            let mut killer = managed.killer.lock().unwrap_or_else(|e| e.into_inner());
            process_group::sigkill_group(pid, killer.as_mut())
                .map_err(|source| Error::ForceKillFailed { source })?;
        }

        Ok(())
    }

    /// Return the managed child process ID, when available.
    #[must_use]
    pub fn process_id(&self) -> Option<u32> {
        self.managed_state().child_pid
    }
}

/// Non-blocking exit-status read shared by [`Terminal::try_wait`] and the
/// best-effort [`poll_exit`] used in [`Drop`].
///
/// `Ok(Some(status))` means the waiter thread published an exit, which
/// it does only after the reader thread has finished draining the PTY
/// master. `Ok(None)` means the child is still running or the reader is
/// still parsing buffered output. `Err(Error::ExitStatusUnavailable)`
/// means the waiter thread terminated without publishing an exit
/// status, which today only happens if `child.wait()` itself returned
/// an error.
fn try_wait_inner(managed: &ManagedState) -> Result<Option<ExitStatus>> {
    match managed.exit_chan.snapshot() {
        ExitState::Exited(status) => Ok(Some(status)),
        ExitState::Pending => Ok(None),
        ExitState::Unavailable => Err(Error::ExitStatusUnavailable),
    }
}

pub(super) fn poll_exit(managed: &ManagedState) -> Option<ExitStatus> {
    match managed.exit_chan.snapshot() {
        ExitState::Exited(status) => Some(status),
        ExitState::Pending | ExitState::Unavailable => None,
    }
}

pub(super) fn wait_for_exit_or_deadline(managed: &ManagedState, timeout: Duration) {
    let chan = &*managed.exit_chan;
    let guard = chan.state.lock().unwrap_or_else(|e| e.into_inner());
    drop(
        chan.sync_cvar
            .wait_timeout_while(guard, timeout, |s| matches!(s, ExitState::Pending))
            .unwrap_or_else(|e| e.into_inner()),
    );
}

#[cfg(unix)]
fn disable_echo(master: &dyn MasterPty) -> Result<()> {
    use nix::sys::termios::{LocalFlags, SetArg, tcgetattr, tcsetattr};
    use std::fs::OpenOptions;
    use std::os::unix::fs::OpenOptionsExt;

    let Some(path) = master.tty_name() else {
        return Ok(());
    };

    // O_NOCTTY: this fd exists only to twiddle termios. It must never become
    // the calling process's controlling terminal. Without this flag, a
    // tastty consumer running inside another tastty session (or any process
    // whose own ctty is a different PTY) hits a SIGHUP path during the
    // subsequent fork inside slave.spawn_command and the consumer exits
    // with status 129 before the spawn completes.
    let tty = OpenOptions::new()
        .read(true)
        .write(true)
        .custom_flags(libc::O_NOCTTY)
        .open(path)
        .map_err(|source| Error::DisableEchoFailed { source })?;
    let mut termios = tcgetattr(&tty).map_err(|e| Error::DisableEchoFailed { source: e.into() })?;
    termios.local_flags &= !LocalFlags::ECHO;
    tcsetattr(&tty, SetArg::TCSANOW, &termios)
        .map_err(|e| Error::DisableEchoFailed { source: e.into() })?;
    Ok(())
}