babysit 0.7.2

Wrap a shell command in a PTY and expose it to external AI agents (Claude / Codex) via subcommands
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
//! A `Pane` wraps a PTY pair, the child process, and the threads that
//! ferry bytes between the master fd and attached clients.
//!
//! Output bytes from the PTY are tee'd to a log file and fanned out through
//! an `OutputHub` to any attached clients. They are also fed into a `vt100`
//! virtual-terminal parser so `babysit screenshot` can render the current
//! on-screen grid (the client's own terminal still renders the live bytes
//! directly for `attach`).

use crate::cli::ShotFormat;
use anyhow::{Context, Result};
use portable_pty::{ChildKiller, CommandBuilder, MasterPty, NativePtySystem, PtySize, PtySystem};
use std::collections::VecDeque;
use std::fs::OpenOptions;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};

/// Maximum bytes of recent PTY output retained for replay to a freshly
/// attached client, so attaching shows the current screen/context. Older
/// output is still on disk in the session log.
const BACKLOG_CAP: usize = 1 << 20; // 1 MiB

/// Fans PTY output out to attached clients and keeps a bounded backlog so a
/// newly attached client can be caught up. The backlog and client list share
/// one lock, so `subscribe` snapshots the backlog and registers atomically —
/// a client sees the backlog then live output with no gap and no duplicate.
#[derive(Default)]
pub struct OutputHub {
    inner: Mutex<HubInner>,
}

#[derive(Default)]
struct HubInner {
    backlog: VecDeque<u8>,
    clients: Vec<UnboundedSender<Vec<u8>>>,
}

impl OutputHub {
    pub fn new() -> Arc<Self> {
        Arc::new(Self::default())
    }

    /// Append a chunk to the backlog and push it to every attached client,
    /// dropping any client whose receiver has gone away.
    pub fn broadcast(&self, data: &[u8]) {
        let Ok(mut g) = self.inner.lock() else {
            return;
        };
        g.backlog.extend(data);
        let overflow = g.backlog.len().saturating_sub(BACKLOG_CAP);
        if overflow > 0 {
            g.backlog.drain(..overflow);
        }
        if !g.clients.is_empty() {
            let chunk = data.to_vec();
            g.clients.retain(|tx| tx.send(chunk.clone()).is_ok());
        }
    }

    /// Register a client. Returns a receiver that first yields the current
    /// backlog (if any), then live output.
    pub fn subscribe(&self) -> UnboundedReceiver<Vec<u8>> {
        let (tx, rx) = unbounded_channel();
        if let Ok(mut g) = self.inner.lock() {
            if !g.backlog.is_empty() {
                let snapshot: Vec<u8> = g.backlog.iter().copied().collect();
                let _ = tx.send(snapshot);
            }
            g.clients.push(tx);
        }
        rx
    }
}

pub struct Pane {
    pub writer: Mutex<Box<dyn Write + Send>>,
    /// PTY master, used for resizing. `None` in no-tty (pipe) mode.
    master: Option<Mutex<Box<dyn MasterPty + Send>>>,
    /// Independent signaller for the child. Kept separate from the child
    /// handle (which the wait thread holds locked for the entire duration of
    /// its blocking `wait()`) so `kill()` never has to contend with it.
    killer: Mutex<Box<dyn ChildKiller + Send + Sync>>,
    /// OS process id of the child, if known.
    pub pid: Option<u32>,
    /// Latest known exit status, set by the wait thread when the child exits.
    pub exit_status: Arc<Mutex<Option<ExitInfo>>>,
    /// Notified once when the child exits, so async callers can `await` it.
    pub exit_notify: Arc<tokio::sync::Notify>,
    /// Notified once the reader thread has drained all PTY output (to stdout
    /// and the log) and seen EOF. Lets shutdown wait for the final bytes
    /// instead of racing `process::exit` against the last flush.
    pub reader_done: Arc<tokio::sync::Notify>,
    /// Virtual terminal: every output byte is fed here so we can render the
    /// current visible screen for `babysit screenshot`.
    screen: Arc<Mutex<vt100::Parser>>,
    /// Output activity counters, updated by the reader thread(s).
    activity: Arc<Activity>,
}

/// Cheap, lock-free probes for "has output changed?" and "how long since the
/// last output?", shared with the reader thread(s).
///
/// * `seq` increments on every output chunk, so an agent can poll `status`
///   and tell whether the screen moved without re-fetching a screenshot.
/// * `last_ms` is the epoch-millis timestamp of the most recent output (seeded
///   at spawn), so the worker can enforce an idle timeout.
pub struct Activity {
    pub seq: AtomicU64,
    pub last_ms: AtomicU64,
}

fn now_ms() -> u64 {
    use std::time::{SystemTime, UNIX_EPOCH};
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_millis() as u64)
        .unwrap_or(0)
}

#[derive(Debug, Clone, Copy)]
pub struct ExitInfo {
    pub code: Option<i32>,
    /// True if the process was terminated by a signal.
    pub signaled: bool,
}

impl Pane {
    /// Spawn `cmd[0]` with `cmd[1..]` as arguments. With `tty` it runs inside
    /// a fresh PTY of the given size (so interactive programs behave); without
    /// it the process is run with plain pipes (so programs that detect a
    /// non-tty emit clean, line-oriented output). Output is fanned out through
    /// `hub` to attached clients and tee'd to `output_log` if provided.
    pub fn spawn(
        cmd: &[String],
        rows: u16,
        cols: u16,
        extra_env: &[(String, String)],
        output_log: Option<&Path>,
        hub: Arc<OutputHub>,
        tty: bool,
    ) -> Result<Self> {
        anyhow::ensure!(!cmd.is_empty(), "empty command");

        // Each backend yields: the child handle, a writer for its stdin, an
        // optional PTY master (for resize), and one or more output readers.
        let child: Box<dyn portable_pty::Child + Send + Sync>;
        let writer: Box<dyn Write + Send>;
        let master: Option<Mutex<Box<dyn MasterPty + Send>>>;
        let mut readers: Vec<Box<dyn Read + Send>> = Vec::new();

        if tty {
            let pty_system = NativePtySystem::default();
            let pair = pty_system
                .openpty(PtySize {
                    rows,
                    cols,
                    pixel_width: 0,
                    pixel_height: 0,
                })
                .context("openpty failed")?;

            let mut builder = CommandBuilder::new(&cmd[0]);
            for arg in &cmd[1..] {
                builder.arg(arg);
            }
            if let Ok(cwd) = std::env::current_dir() {
                builder.cwd(cwd);
            }
            for (k, v) in extra_env {
                builder.env(k, v);
            }

            let spawned = pair
                .slave
                .spawn_command(builder)
                .with_context(|| format!("spawning {:?}", cmd))?;
            // Drop slave — the child has it. Keeping it open in the parent
            // prevents EOF on master read when the child exits.
            drop(pair.slave);

            readers.push(
                pair.master
                    .try_clone_reader()
                    .context("cloning PTY reader")?,
            );
            writer = pair.master.take_writer().context("taking PTY writer")?;
            master = Some(Mutex::new(pair.master));
            child = spawned;
        } else {
            use std::process::{Command, Stdio};
            let mut c = Command::new(&cmd[0]);
            c.args(&cmd[1..]);
            if let Ok(cwd) = std::env::current_dir() {
                c.current_dir(cwd);
            }
            for (k, v) in extra_env {
                c.env(k, v);
            }
            c.stdin(Stdio::piped())
                .stdout(Stdio::piped())
                .stderr(Stdio::piped());
            let mut spawned = c.spawn().with_context(|| format!("spawning {:?}", cmd))?;
            writer = Box::new(spawned.stdin.take().context("taking child stdin")?);
            readers.push(Box::new(
                spawned.stdout.take().context("taking child stdout")?,
            ));
            readers.push(Box::new(
                spawned.stderr.take().context("taking child stderr")?,
            ));
            // portable_pty implements Child/ChildKiller for std::process::Child,
            // so the wait/kill machinery below is identical to the PTY path.
            master = None;
            child = Box::new(spawned);
        }

        // Grab an independent killer + the pid up front, before `child` is
        // moved behind a mutex the wait thread will hold while blocked.
        let killer = child.clone_killer();
        let pid = child.process_id();

        let exit_status: Arc<Mutex<Option<ExitInfo>>> = Arc::new(Mutex::new(None));
        let exit_notify = Arc::new(tokio::sync::Notify::new());
        let reader_done = Arc::new(tokio::sync::Notify::new());
        let log_path: Option<PathBuf> = output_log.map(|p| p.to_path_buf());
        // Virtual terminal sized to the PTY (no scrollback: a screenshot is a
        // single visible frame). Kept in sync with the PTY via `resize`.
        let screen = Arc::new(Mutex::new(vt100::Parser::new(rows, cols, 0)));
        // Seed `last_ms` at spawn so idle time is measured from start, not from
        // the first byte of output.
        let activity = Arc::new(Activity {
            seq: AtomicU64::new(0),
            last_ms: AtomicU64::new(now_ms()),
        });

        // One reader thread per output stream (PTY: 1; pipe: stdout + stderr).
        // `reader_done` fires when the last of them drains and sees EOF.
        let remaining = Arc::new(AtomicUsize::new(readers.len()));
        for reader in readers {
            spawn_output_reader(
                reader,
                log_path.clone(),
                hub.clone(),
                screen.clone(),
                activity.clone(),
                remaining.clone(),
                reader_done.clone(),
            );
        }

        let child = Arc::new(Mutex::new(child));

        // Wait thread: capture exit status when the child finishes and
        // wake any awaiter.
        {
            let child = child.clone();
            let exit_status = exit_status.clone();
            let exit_notify = exit_notify.clone();
            thread::spawn(move || {
                let status = {
                    let mut guard = child.lock().unwrap();
                    guard.wait()
                };
                let info = match status {
                    Ok(s) => {
                        // portable_pty reports signal termination via
                        // `signal()`; the numeric `exit_code()` is a
                        // placeholder (1) in that case, so don't surface it.
                        let signaled = s.signal().is_some();
                        ExitInfo {
                            code: if signaled {
                                None
                            } else {
                                s.exit_code().try_into().ok()
                            },
                            signaled,
                        }
                    }
                    Err(_) => ExitInfo {
                        code: None,
                        signaled: true,
                    },
                };
                if let Ok(mut g) = exit_status.lock() {
                    *g = Some(info);
                }
                exit_notify.notify_waiters();
                // Also notify any future awaiter (notify_one stays armed
                // until consumed, unlike notify_waiters).
                exit_notify.notify_one();
            });
        }

        Ok(Self {
            writer: Mutex::new(writer),
            master,
            killer: Mutex::new(killer),
            pid,
            exit_status,
            exit_notify,
            reader_done,
            screen,
            activity,
        })
    }

    /// Forward raw bytes (typed characters or text from `babysit send`) to
    /// the PTY's stdin.
    pub fn write_input(&self, bytes: &[u8]) {
        if let Ok(mut w) = self.writer.lock() {
            let _ = w.write_all(bytes);
            let _ = w.flush();
        }
    }

    /// Resize the PTY (and its line discipline) to the given dimensions.
    /// No-op in no-tty (pipe) mode, which has no PTY.
    pub fn resize(&self, rows: u16, cols: u16) {
        if rows == 0 || cols == 0 {
            return;
        }
        if let Some(master) = &self.master
            && let Ok(m) = master.lock()
        {
            let _ = m.resize(PtySize {
                rows,
                cols,
                pixel_width: 0,
                pixel_height: 0,
            });
        }
        // Keep the virtual terminal in lock-step with the PTY so screenshots
        // reflect the dimensions the program is actually drawing for.
        if let Ok(mut s) = self.screen.lock() {
            s.screen_mut().set_size(rows, cols);
        }
    }

    /// Render the current visible screen of the virtual terminal in the
    /// requested `format`. See `render_screen` for the output shape.
    pub fn screenshot(&self, format: ShotFormat, trim: bool) -> serde_json::Value {
        match self.screen.lock() {
            Ok(p) => crate::render::render_screen(p.screen(), format, trim),
            Err(_) => serde_json::json!({ "error": "screen lock poisoned" }),
        }
    }

    /// `Some(_)` once the child has exited.
    pub fn exit_info(&self) -> Option<ExitInfo> {
        self.exit_status.lock().ok().and_then(|g| *g)
    }

    /// Monotonic counter of output chunks seen so far. An agent can compare it
    /// across `status` polls to cheaply tell whether the screen has moved.
    pub fn screen_seq(&self) -> u64 {
        self.activity.seq.load(Ordering::Relaxed)
    }

    /// Milliseconds since the most recent output (or since spawn if none yet).
    pub fn idle_ms(&self) -> u64 {
        now_ms().saturating_sub(self.activity.last_ms.load(Ordering::Relaxed))
    }

    /// Signal the child to terminate (best-effort). Uses the independent
    /// killer so it works even while the wait thread is blocked in `wait()`.
    pub fn kill(&self) {
        if let Ok(mut k) = self.killer.lock() {
            let _ = k.kill();
        }
    }
}

/// Pump one output stream to the hub + log on its own blocking thread. When
/// the last live reader (`remaining` reaching zero) sees EOF, fire
/// `reader_done` so shutdown can wait for the final bytes.
fn spawn_output_reader(
    mut reader: Box<dyn Read + Send>,
    log_path: Option<PathBuf>,
    hub: Arc<OutputHub>,
    screen: Arc<Mutex<vt100::Parser>>,
    activity: Arc<Activity>,
    remaining: Arc<AtomicUsize>,
    reader_done: Arc<tokio::sync::Notify>,
) {
    thread::spawn(move || {
        // O_APPEND makes concurrent appends (stdout + stderr) safe without a
        // shared lock.
        let mut log_file =
            log_path.and_then(|p| OpenOptions::new().create(true).append(true).open(&p).ok());
        let mut buf = [0u8; 8192];
        loop {
            match reader.read(&mut buf) {
                Ok(0) => break,
                Ok(n) => {
                    activity.seq.fetch_add(1, Ordering::Relaxed);
                    activity.last_ms.store(now_ms(), Ordering::Relaxed);
                    if let Ok(mut p) = screen.lock() {
                        p.process(&buf[..n]);
                    }
                    hub.broadcast(&buf[..n]);
                    if let Some(f) = log_file.as_mut() {
                        let _ = f.write_all(&buf[..n]);
                    }
                }
                Err(_) => break,
            }
        }
        if remaining.fetch_sub(1, Ordering::SeqCst) == 1 {
            // notify_one arms a permit so a late awaiter still observes it.
            reader_done.notify_waiters();
            reader_done.notify_one();
        }
    });
}