use anyhow::{Context, Result};
use portable_pty::{ChildKiller, CommandBuilder, MasterPty, NativePtySystem, PtySize, PtySystem};
use std::collections::VecDeque;
use std::fs::OpenOptions;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
const BACKLOG_CAP: usize = 1 << 20;
#[derive(Default)]
pub struct OutputHub {
inner: Mutex<HubInner>,
}
#[derive(Default)]
struct HubInner {
backlog: VecDeque<u8>,
clients: Vec<UnboundedSender<Vec<u8>>>,
}
impl OutputHub {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn broadcast(&self, data: &[u8]) {
let Ok(mut g) = self.inner.lock() else {
return;
};
g.backlog.extend(data);
let overflow = g.backlog.len().saturating_sub(BACKLOG_CAP);
if overflow > 0 {
g.backlog.drain(..overflow);
}
if !g.clients.is_empty() {
let chunk = data.to_vec();
g.clients.retain(|tx| tx.send(chunk.clone()).is_ok());
}
}
pub fn subscribe(&self) -> UnboundedReceiver<Vec<u8>> {
let (tx, rx) = unbounded_channel();
if let Ok(mut g) = self.inner.lock() {
if !g.backlog.is_empty() {
let snapshot: Vec<u8> = g.backlog.iter().copied().collect();
let _ = tx.send(snapshot);
}
g.clients.push(tx);
}
rx
}
}
pub struct Pane {
pub writer: Mutex<Box<dyn Write + Send>>,
master: Option<Mutex<Box<dyn MasterPty + Send>>>,
killer: Mutex<Box<dyn ChildKiller + Send + Sync>>,
pub pid: Option<u32>,
pub exit_status: Arc<Mutex<Option<ExitInfo>>>,
pub exit_notify: Arc<tokio::sync::Notify>,
pub reader_done: Arc<tokio::sync::Notify>,
}
#[derive(Debug, Clone, Copy)]
pub struct ExitInfo {
pub code: Option<i32>,
pub signaled: bool,
}
impl Pane {
pub fn spawn(
cmd: &[String],
rows: u16,
cols: u16,
extra_env: &[(String, String)],
output_log: Option<&Path>,
hub: Arc<OutputHub>,
tty: bool,
) -> Result<Self> {
anyhow::ensure!(!cmd.is_empty(), "empty command");
let child: Box<dyn portable_pty::Child + Send + Sync>;
let writer: Box<dyn Write + Send>;
let master: Option<Mutex<Box<dyn MasterPty + Send>>>;
let mut readers: Vec<Box<dyn Read + Send>> = Vec::new();
if tty {
let pty_system = NativePtySystem::default();
let pair = pty_system
.openpty(PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
})
.context("openpty failed")?;
let mut builder = CommandBuilder::new(&cmd[0]);
for arg in &cmd[1..] {
builder.arg(arg);
}
if let Ok(cwd) = std::env::current_dir() {
builder.cwd(cwd);
}
for (k, v) in extra_env {
builder.env(k, v);
}
let spawned = pair
.slave
.spawn_command(builder)
.with_context(|| format!("spawning {:?}", cmd))?;
drop(pair.slave);
readers.push(
pair.master
.try_clone_reader()
.context("cloning PTY reader")?,
);
writer = pair.master.take_writer().context("taking PTY writer")?;
master = Some(Mutex::new(pair.master));
child = spawned;
} else {
use std::process::{Command, Stdio};
let mut c = Command::new(&cmd[0]);
c.args(&cmd[1..]);
if let Ok(cwd) = std::env::current_dir() {
c.current_dir(cwd);
}
for (k, v) in extra_env {
c.env(k, v);
}
c.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut spawned = c.spawn().with_context(|| format!("spawning {:?}", cmd))?;
writer = Box::new(spawned.stdin.take().context("taking child stdin")?);
readers.push(Box::new(
spawned.stdout.take().context("taking child stdout")?,
));
readers.push(Box::new(
spawned.stderr.take().context("taking child stderr")?,
));
master = None;
child = Box::new(spawned);
}
let killer = child.clone_killer();
let pid = child.process_id();
let exit_status: Arc<Mutex<Option<ExitInfo>>> = Arc::new(Mutex::new(None));
let exit_notify = Arc::new(tokio::sync::Notify::new());
let reader_done = Arc::new(tokio::sync::Notify::new());
let log_path: Option<PathBuf> = output_log.map(|p| p.to_path_buf());
let remaining = Arc::new(AtomicUsize::new(readers.len()));
for reader in readers {
spawn_output_reader(
reader,
log_path.clone(),
hub.clone(),
remaining.clone(),
reader_done.clone(),
);
}
let child = Arc::new(Mutex::new(child));
{
let child = child.clone();
let exit_status = exit_status.clone();
let exit_notify = exit_notify.clone();
thread::spawn(move || {
let status = {
let mut guard = child.lock().unwrap();
guard.wait()
};
let info = match status {
Ok(s) => {
let signaled = s.signal().is_some();
ExitInfo {
code: if signaled {
None
} else {
s.exit_code().try_into().ok()
},
signaled,
}
}
Err(_) => ExitInfo {
code: None,
signaled: true,
},
};
if let Ok(mut g) = exit_status.lock() {
*g = Some(info);
}
exit_notify.notify_waiters();
exit_notify.notify_one();
});
}
Ok(Self {
writer: Mutex::new(writer),
master,
killer: Mutex::new(killer),
pid,
exit_status,
exit_notify,
reader_done,
})
}
pub fn write_input(&self, bytes: &[u8]) {
if let Ok(mut w) = self.writer.lock() {
let _ = w.write_all(bytes);
let _ = w.flush();
}
}
pub fn resize(&self, rows: u16, cols: u16) {
if rows == 0 || cols == 0 {
return;
}
if let Some(master) = &self.master
&& let Ok(m) = master.lock()
{
let _ = m.resize(PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
});
}
}
pub fn exit_info(&self) -> Option<ExitInfo> {
self.exit_status.lock().ok().and_then(|g| *g)
}
pub fn kill(&self) {
if let Ok(mut k) = self.killer.lock() {
let _ = k.kill();
}
}
}
fn spawn_output_reader(
mut reader: Box<dyn Read + Send>,
log_path: Option<PathBuf>,
hub: Arc<OutputHub>,
remaining: Arc<AtomicUsize>,
reader_done: Arc<tokio::sync::Notify>,
) {
thread::spawn(move || {
let mut log_file =
log_path.and_then(|p| OpenOptions::new().create(true).append(true).open(&p).ok());
let mut buf = [0u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
hub.broadcast(&buf[..n]);
if let Some(f) = log_file.as_mut() {
let _ = f.write_all(&buf[..n]);
}
}
Err(_) => break,
}
}
if remaining.fetch_sub(1, Ordering::SeqCst) == 1 {
reader_done.notify_waiters();
reader_done.notify_one();
}
});
}