use portable_pty::{native_pty_system, CommandBuilder, PtySize};
use std::io::{self, Read, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
#[cfg(unix)]
use libc;
#[derive(Debug, Clone)]
pub struct IdleConfig {
pub timeout_secs: u64,
pub prompt: String,
pub max_retries: u32,
}
#[derive(Debug)]
pub struct RunResult {
pub exit_code: u32,
}
#[derive(Debug, thiserror::Error)]
pub enum PtyError {
#[error("failed to open PTY: {0}")]
Open(anyhow::Error),
#[error("failed to spawn command: {0}")]
Spawn(anyhow::Error),
#[error("failed to clone PTY reader: {0}")]
Reader(anyhow::Error),
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("timeout: child did not exit within {0}s")]
Timeout(u64),
#[error("idle exhausted: no output for {0}s after maximum keepalive retries")]
IdleExhausted(u64),
}
fn join_reader(
handle: JoinHandle<()>,
stop: &Arc<Mutex<bool>>,
master: Box<dyn portable_pty::MasterPty + Send>,
) {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if handle.is_finished() {
let _ = handle.join();
drop(master);
return;
}
if Instant::now() > deadline {
break;
}
thread::sleep(Duration::from_millis(50));
}
if let Ok(mut g) = stop.lock() {
*g = true;
}
drop(master);
}
struct PtySpawn {
master: Box<dyn portable_pty::MasterPty + Send>,
child: Box<dyn portable_pty::Child + Send>,
killer: Box<dyn portable_pty::ChildKiller + Send>,
reader: Box<dyn Read + Send>,
}
fn spawn_pty_child(argv: &[String], cwd: &Path) -> Result<PtySpawn, PtyError> {
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 50,
cols: 200,
pixel_width: 0,
pixel_height: 0,
})
.map_err(PtyError::Open)?;
let mut cmd = CommandBuilder::new(&argv[0]);
if argv.len() > 1 {
cmd.args(&argv[1..]);
}
cmd.cwd(cwd);
for (key, val) in std::env::vars_os() {
cmd.env(key, val);
}
for key in &[
"CLAUDE_CODE_SESSION_ID",
"CLAUDE_CODE_CHILD_SESSION",
"CLAUDE_CODE_ENTRYPOINT",
"CLAUDE_CODE_EXECPATH",
"CLAUDECODE",
"AI_AGENT",
] {
cmd.env_remove(key);
}
let child = pair.slave.spawn_command(cmd).map_err(PtyError::Spawn)?;
drop(pair.slave);
let killer = child.clone_killer();
let reader = pair.master.try_clone_reader().map_err(PtyError::Reader)?;
let master = pair.master;
Ok(PtySpawn {
master,
child,
killer,
reader,
})
}
fn spawn_basic_reader(
mut reader: Box<dyn Read + Send>,
stop_reader: Arc<Mutex<bool>>,
last_output_reader: Arc<Mutex<Instant>>,
) -> JoinHandle<()> {
thread::spawn(move || {
let stdout = io::stdout();
let mut buf = [0u8; 4096];
loop {
if stop_reader.lock().is_ok_and(|g| *g) {
break;
}
match reader.read(&mut buf) {
Ok(0) => break, Ok(n) => {
if let Ok(mut ts) = last_output_reader.lock() {
*ts = Instant::now();
}
let mut out = stdout.lock();
if out.write_all(&buf[..n]).is_err() {
break;
}
let _ = out.flush();
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(_) => break, }
}
})
}
struct IdleState {
timeout: u64,
prompt: String,
max_retries: u32,
retry_count: u32,
last_keepalive: Option<Instant>,
}
const KEEPALIVE_GRACE_SECS: u64 = 20;
impl IdleState {
fn from_config(idle: Option<&IdleConfig>) -> Self {
IdleState {
timeout: idle.map(|c| c.timeout_secs).unwrap_or(0),
prompt: idle
.map(|c| c.prompt.clone())
.unwrap_or_else(|| "Continue".to_string()),
max_retries: idle.map(|c| c.max_retries).unwrap_or(3),
retry_count: 0,
last_keepalive: None,
}
}
}
enum IdleTick {
Exhausted,
KeepaliveInjected,
Recovered,
Ok,
}
fn tick_idle(
state: &mut IdleState,
last_output: &Arc<Mutex<Instant>>,
pty_writer: &mut Option<Box<dyn Write + Send>>,
) -> IdleTick {
if state.timeout == 0 {
return IdleTick::Ok;
}
let silent_secs = last_output
.lock()
.map(|ts| ts.elapsed())
.unwrap_or(Duration::ZERO);
if silent_secs >= Duration::from_secs(state.timeout) {
if state.retry_count >= state.max_retries {
return IdleTick::Exhausted;
}
state.retry_count += 1;
eprintln!(
"heartbeat-launch: idle detected ({:.0}s silent) — injecting keepalive (attempt {}/{})",
silent_secs.as_secs_f64(),
state.retry_count,
state.max_retries,
);
if let Some(ref mut w) = pty_writer {
let _ = w.write_all(b"\x1b\x1b");
let _ = w.flush();
thread::sleep(Duration::from_millis(50));
let _ = w.write_all(state.prompt.as_bytes());
let _ = w.write_all(b"\r");
let _ = w.flush();
}
state.last_keepalive = Some(Instant::now());
if let Ok(mut ts) = last_output.lock() {
*ts = Instant::now();
}
IdleTick::KeepaliveInjected
} else if state.retry_count > 0 {
let past_grace = state
.last_keepalive
.map(|t| t.elapsed() >= Duration::from_secs(KEEPALIVE_GRACE_SECS))
.unwrap_or(true);
if past_grace {
eprintln!(
"heartbeat-launch: output resumed — resetting idle retry counter (was {})",
state.retry_count
);
state.retry_count = 0;
state.last_keepalive = None;
IdleTick::Recovered
} else {
IdleTick::Ok
}
} else {
IdleTick::Ok
}
}
pub fn run(
argv: &[String],
cwd: &Path,
timeout_secs: u64,
exit_signal: Option<&Path>,
idle: Option<&IdleConfig>,
) -> Result<RunResult, PtyError> {
let PtySpawn {
master,
mut child,
mut killer,
reader,
} = spawn_pty_child(argv, cwd)?;
let stop = Arc::new(Mutex::new(false));
let last_output: Arc<Mutex<Instant>> = Arc::new(Mutex::new(Instant::now()));
let reader_thread = spawn_basic_reader(reader, Arc::clone(&stop), Arc::clone(&last_output));
if let Some(sig) = exit_signal {
let _ = std::fs::remove_file(sig);
}
let mut pty_writer: Option<Box<dyn Write + Send>> = master.take_writer().ok();
let poll_interval = Duration::from_millis(100);
let deadline = if timeout_secs > 0 {
Some(Instant::now() + Duration::from_secs(timeout_secs))
} else {
None
};
let mut idle_state = IdleState::from_config(idle);
let mut exit_sent = false;
let exit_code = loop {
match child.try_wait() {
Ok(Some(status)) => break status.exit_code(),
Ok(None) => {
if let Some(dl) = deadline {
if Instant::now() >= dl {
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGKILL);
}
}
}
let _ = killer.kill(); thread::sleep(Duration::from_millis(500));
join_reader(reader_thread, &stop, master);
return Err(PtyError::Timeout(timeout_secs));
}
}
if !exit_sent {
if let Some(sig) = exit_signal {
if sig.exists() {
let _ = std::fs::remove_file(sig);
eprintln!("heartbeat-launch: exit signal detected, terminating child");
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGTERM);
}
}
}
let term_deadline = Instant::now() + Duration::from_secs(2);
while Instant::now() < term_deadline {
if let Ok(Some(_)) = child.try_wait() {
break;
}
thread::sleep(Duration::from_millis(50));
}
if child.try_wait().map(|s| s.is_none()).unwrap_or(true) {
eprintln!(
"heartbeat-launch: child did not exit on SIGTERM, sending SIGKILL"
);
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGKILL);
}
}
}
let _ = killer.kill();
}
exit_sent = true;
}
}
}
if let IdleTick::Exhausted =
tick_idle(&mut idle_state, &last_output, &mut pty_writer)
{
eprintln!(
"heartbeat-launch: idle timeout fired {} time(s) without recovery — killing child",
idle_state.retry_count
);
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGKILL);
}
}
}
let _ = killer.kill();
thread::sleep(Duration::from_millis(500));
join_reader(reader_thread, &stop, master);
return Err(PtyError::IdleExhausted(idle_state.timeout));
}
thread::sleep(poll_interval);
}
Err(e) => return Err(PtyError::Io(e)),
}
};
join_reader(reader_thread, &stop, master);
let exit_code = if exit_sent { 0 } else { exit_code };
Ok(RunResult { exit_code })
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn tmp() -> PathBuf {
std::env::temp_dir()
}
#[cfg(unix)]
#[test]
fn echo_hello_exit_zero() {
let result = run(
&["echo".to_string(), "hello".to_string()],
&tmp(),
10,
None,
None,
)
.expect("run should succeed");
assert_eq!(result.exit_code, 0, "echo should exit 0");
}
#[cfg(unix)]
#[test]
fn nonzero_exit_code_propagated() {
let result =
run(&["false".to_string()], &tmp(), 10, None, None).expect("run should succeed");
assert_ne!(result.exit_code, 0, "false should exit non-zero");
}
#[cfg(unix)]
#[test]
fn timeout_fires() {
let err = run(
&["sleep".to_string(), "60".to_string()],
&tmp(),
1,
None,
None,
)
.expect_err("should time out");
match err {
PtyError::Timeout(secs) => assert_eq!(secs, 1),
other => panic!("expected Timeout, got {other:?}"),
}
}
#[cfg(unix)]
#[test]
fn exit_signal_triggers_child_exit() {
use std::fs;
let signal_path = tmp().join("test-exit-signal-trigger.tmp");
let _ = fs::remove_file(&signal_path);
let signal_path_clone = signal_path.clone();
let writer_thread = thread::spawn(move || {
thread::sleep(Duration::from_millis(300));
fs::write(&signal_path_clone, b"").expect("write signal file");
});
let result = run(
&["sh".to_string(), "-c".to_string(), "read line".to_string()],
&tmp(),
10, Some(&signal_path),
None,
)
.expect("run should succeed");
writer_thread.join().expect("writer thread panicked");
assert_eq!(
result.exit_code, 0,
"child killed via exit-signal should return exit code 0"
);
assert!(
!signal_path.exists(),
"signal file should be deleted after SIGTERM is sent"
);
}
#[cfg(unix)]
#[test]
fn stale_signal_file_deleted_before_poll() {
use std::fs;
let signal_path = tmp().join("test-exit-signal-stale.tmp");
fs::write(&signal_path, b"").expect("write stale signal file");
assert!(
signal_path.exists(),
"precondition: stale file should exist"
);
let result = run(
&["echo".to_string(), "hello".to_string()],
&tmp(),
10,
Some(&signal_path),
None,
)
.expect("run should succeed");
assert_eq!(result.exit_code, 0);
assert!(
!signal_path.exists(),
"stale signal file should be deleted by run() before poll loop"
);
}
}