use anyhow::{Context, Result};
use portable_pty::{ChildKiller, CommandBuilder, MasterPty, NativePtySystem, PtySize, PtySystem};
use std::collections::VecDeque;
use std::fs::OpenOptions;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
const BACKLOG_CAP: usize = 1 << 20;
#[derive(Default)]
pub struct OutputHub {
inner: Mutex<HubInner>,
}
#[derive(Default)]
struct HubInner {
backlog: VecDeque<u8>,
clients: Vec<UnboundedSender<Vec<u8>>>,
}
impl OutputHub {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn broadcast(&self, data: &[u8]) {
let Ok(mut g) = self.inner.lock() else {
return;
};
g.backlog.extend(data);
let overflow = g.backlog.len().saturating_sub(BACKLOG_CAP);
if overflow > 0 {
g.backlog.drain(..overflow);
}
if !g.clients.is_empty() {
let chunk = data.to_vec();
g.clients.retain(|tx| tx.send(chunk.clone()).is_ok());
}
}
pub fn subscribe(&self) -> UnboundedReceiver<Vec<u8>> {
let (tx, rx) = unbounded_channel();
if let Ok(mut g) = self.inner.lock() {
if !g.backlog.is_empty() {
let snapshot: Vec<u8> = g.backlog.iter().copied().collect();
let _ = tx.send(snapshot);
}
g.clients.push(tx);
}
rx
}
}
pub struct Pane {
pub writer: Mutex<Box<dyn Write + Send>>,
pub master: Mutex<Box<dyn MasterPty + Send>>,
killer: Mutex<Box<dyn ChildKiller + Send + Sync>>,
pub pid: Option<u32>,
pub exit_status: Arc<Mutex<Option<ExitInfo>>>,
pub exit_notify: Arc<tokio::sync::Notify>,
pub reader_done: Arc<tokio::sync::Notify>,
}
#[derive(Debug, Clone, Copy)]
pub struct ExitInfo {
pub code: Option<i32>,
pub signaled: bool,
}
impl Pane {
pub fn spawn(
cmd: &[String],
rows: u16,
cols: u16,
extra_env: &[(String, String)],
output_log: Option<&Path>,
hub: Arc<OutputHub>,
) -> Result<Self> {
anyhow::ensure!(!cmd.is_empty(), "empty command");
let pty_system = NativePtySystem::default();
let pair = pty_system
.openpty(PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
})
.context("openpty failed")?;
let mut builder = CommandBuilder::new(&cmd[0]);
for arg in &cmd[1..] {
builder.arg(arg);
}
if let Ok(cwd) = std::env::current_dir() {
builder.cwd(cwd);
}
for (k, v) in extra_env {
builder.env(k, v);
}
let child = pair
.slave
.spawn_command(builder)
.with_context(|| format!("spawning {:?}", cmd))?;
let killer = child.clone_killer();
let pid = child.process_id();
drop(pair.slave);
let exit_status: Arc<Mutex<Option<ExitInfo>>> = Arc::new(Mutex::new(None));
let exit_notify = Arc::new(tokio::sync::Notify::new());
let mut reader = pair
.master
.try_clone_reader()
.context("cloning PTY reader")?;
let log_path: Option<PathBuf> = output_log.map(|p| p.to_path_buf());
let reader_done = Arc::new(tokio::sync::Notify::new());
{
let reader_done = reader_done.clone();
let hub = hub.clone();
thread::spawn(move || {
let mut log_file = log_path
.and_then(|p| OpenOptions::new().create(true).append(true).open(&p).ok());
let mut buf = [0u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
hub.broadcast(&buf[..n]);
if let Some(f) = log_file.as_mut() {
let _ = f.write_all(&buf[..n]);
}
}
Err(_) => break,
}
}
reader_done.notify_waiters();
reader_done.notify_one();
});
}
let writer = pair.master.take_writer().context("taking PTY writer")?;
let child = Arc::new(Mutex::new(child));
{
let child = child.clone();
let exit_status = exit_status.clone();
let exit_notify = exit_notify.clone();
thread::spawn(move || {
let status = {
let mut guard = child.lock().unwrap();
guard.wait()
};
let info = match status {
Ok(s) => {
let signaled = s.signal().is_some();
ExitInfo {
code: if signaled {
None
} else {
s.exit_code().try_into().ok()
},
signaled,
}
}
Err(_) => ExitInfo {
code: None,
signaled: true,
},
};
if let Ok(mut g) = exit_status.lock() {
*g = Some(info);
}
exit_notify.notify_waiters();
exit_notify.notify_one();
});
}
Ok(Self {
writer: Mutex::new(writer),
master: Mutex::new(pair.master),
killer: Mutex::new(killer),
pid,
exit_status,
exit_notify,
reader_done,
})
}
pub fn write_input(&self, bytes: &[u8]) {
if let Ok(mut w) = self.writer.lock() {
let _ = w.write_all(bytes);
let _ = w.flush();
}
}
pub fn resize(&self, rows: u16, cols: u16) {
if rows == 0 || cols == 0 {
return;
}
if let Ok(m) = self.master.lock() {
let _ = m.resize(PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
});
}
}
pub fn exit_info(&self) -> Option<ExitInfo> {
self.exit_status.lock().ok().and_then(|g| *g)
}
pub fn kill(&self) {
if let Ok(mut k) = self.killer.lock() {
let _ = k.kill();
}
}
}