use portable_pty::{native_pty_system, CommandBuilder, PtySize};
use std::io::{self, Read, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
#[cfg(unix)]
use libc;
#[derive(Debug, Clone)]
pub struct IdleConfig {
pub timeout_secs: u64,
pub prompt: String,
pub max_retries: u32,
}
#[derive(Debug)]
pub struct RunResult {
pub exit_code: u32,
}
#[derive(Debug, thiserror::Error)]
pub enum PtyError {
#[error("failed to open PTY: {0}")]
Open(anyhow::Error),
#[error("failed to spawn command: {0}")]
Spawn(anyhow::Error),
#[error("failed to clone PTY reader: {0}")]
Reader(anyhow::Error),
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("timeout: child did not exit within {0}s")]
Timeout(u64),
#[error("idle exhausted: no output for {0}s after maximum keepalive retries")]
IdleExhausted(u64),
}
fn join_reader(
handle: JoinHandle<()>,
stop: &Arc<Mutex<bool>>,
master: Box<dyn portable_pty::MasterPty + Send>,
) {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if handle.is_finished() {
let _ = handle.join();
drop(master);
return;
}
if Instant::now() > deadline {
break;
}
thread::sleep(Duration::from_millis(50));
}
if let Ok(mut g) = stop.lock() {
*g = true;
}
drop(master);
}
struct PtySpawn {
master: Box<dyn portable_pty::MasterPty + Send>,
child: Box<dyn portable_pty::Child + Send>,
killer: Box<dyn portable_pty::ChildKiller + Send>,
reader: Box<dyn Read + Send>,
}
fn spawn_pty_child(argv: &[String], cwd: &Path) -> Result<PtySpawn, PtyError> {
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 50,
cols: 200,
pixel_width: 0,
pixel_height: 0,
})
.map_err(PtyError::Open)?;
let mut cmd = CommandBuilder::new(&argv[0]);
if argv.len() > 1 {
cmd.args(&argv[1..]);
}
cmd.cwd(cwd);
let child = pair.slave.spawn_command(cmd).map_err(PtyError::Spawn)?;
drop(pair.slave);
let killer = child.clone_killer();
let reader = pair.master.try_clone_reader().map_err(PtyError::Reader)?;
let master = pair.master;
Ok(PtySpawn {
master,
child,
killer,
reader,
})
}
fn spawn_basic_reader(
mut reader: Box<dyn Read + Send>,
stop_reader: Arc<Mutex<bool>>,
last_output_reader: Arc<Mutex<Instant>>,
) -> JoinHandle<()> {
thread::spawn(move || {
let stdout = io::stdout();
let mut buf = [0u8; 4096];
loop {
if stop_reader.lock().is_ok_and(|g| *g) {
break;
}
match reader.read(&mut buf) {
Ok(0) => break, Ok(n) => {
if let Ok(mut ts) = last_output_reader.lock() {
*ts = Instant::now();
}
let mut out = stdout.lock();
if out.write_all(&buf[..n]).is_err() {
break;
}
let _ = out.flush();
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(_) => break, }
}
})
}
#[cfg(feature = "launch")]
fn spawn_sentinel_reader(
mut reader: Box<dyn Read + Send>,
stop_reader: Arc<Mutex<bool>>,
last_output_reader: Arc<Mutex<Instant>>,
seen_output_reader: Arc<Mutex<bool>>,
sentinel_buf_reader: Arc<Mutex<Vec<u8>>>,
sentinel_len: usize,
) -> JoinHandle<()> {
thread::spawn(move || {
let stdout = io::stdout();
let mut buf = [0u8; 4096];
loop {
if stop_reader.lock().is_ok_and(|g| *g) {
break;
}
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
if let Ok(mut ts) = last_output_reader.lock() {
*ts = Instant::now();
}
if let Ok(mut seen) = seen_output_reader.lock() {
*seen = true;
}
let raw_chunk = &buf[..n];
let mut out = stdout.lock();
if out.write_all(raw_chunk).is_err() {
break;
}
let _ = out.flush();
let clean = strip_ansi_escapes::strip(raw_chunk);
if let Ok(mut sbuf) = sentinel_buf_reader.lock() {
sbuf.extend_from_slice(&clean);
let keep = 8192 + sentinel_len;
if sbuf.len() > keep * 2 {
let drain = sbuf.len() - keep;
sbuf.drain(..drain);
}
}
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(_) => break,
}
}
})
}
struct IdleState {
timeout: u64,
prompt: String,
max_retries: u32,
retry_count: u32,
last_keepalive: Option<Instant>,
}
const KEEPALIVE_GRACE_SECS: u64 = 5;
impl IdleState {
fn from_config(idle: Option<&IdleConfig>) -> Self {
IdleState {
timeout: idle.map(|c| c.timeout_secs).unwrap_or(0),
prompt: idle
.map(|c| c.prompt.clone())
.unwrap_or_else(|| "Continue".to_string()),
max_retries: idle.map(|c| c.max_retries).unwrap_or(3),
retry_count: 0,
last_keepalive: None,
}
}
}
enum IdleTick {
Exhausted,
KeepaliveInjected,
Recovered,
Ok,
}
fn tick_idle(
state: &mut IdleState,
last_output: &Arc<Mutex<Instant>>,
pty_writer: &mut Option<Box<dyn Write + Send>>,
skip: bool,
) -> IdleTick {
if state.timeout == 0 || skip {
return IdleTick::Ok;
}
let silent_secs = last_output
.lock()
.map(|ts| ts.elapsed())
.unwrap_or(Duration::ZERO);
if silent_secs >= Duration::from_secs(state.timeout) {
if state.retry_count >= state.max_retries {
return IdleTick::Exhausted;
}
state.retry_count += 1;
eprintln!(
"heartbeat-launch: idle detected ({:.0}s silent) — injecting keepalive (attempt {}/{})",
silent_secs.as_secs_f64(),
state.retry_count,
state.max_retries,
);
if let Some(ref mut w) = pty_writer {
let _ = w.write_all(b"\x1b");
let _ = w.flush();
thread::sleep(Duration::from_millis(500));
let _ = w.write_all(state.prompt.as_bytes());
let _ = w.write_all(b"\n");
let _ = w.flush();
}
state.last_keepalive = Some(Instant::now());
if let Ok(mut ts) = last_output.lock() {
*ts = Instant::now();
}
IdleTick::KeepaliveInjected
} else if state.retry_count > 0 {
let past_grace = state
.last_keepalive
.map(|t| t.elapsed() >= Duration::from_secs(KEEPALIVE_GRACE_SECS))
.unwrap_or(true);
if past_grace {
eprintln!(
"heartbeat-launch: output resumed — resetting idle retry counter (was {})",
state.retry_count
);
state.retry_count = 0;
state.last_keepalive = None;
IdleTick::Recovered
} else {
IdleTick::Ok
}
} else {
IdleTick::Ok
}
}
pub fn run(
argv: &[String],
cwd: &Path,
timeout_secs: u64,
exit_signal: Option<&Path>,
idle: Option<&IdleConfig>,
) -> Result<RunResult, PtyError> {
let PtySpawn {
master,
mut child,
mut killer,
reader,
} = spawn_pty_child(argv, cwd)?;
let stop = Arc::new(Mutex::new(false));
let last_output: Arc<Mutex<Instant>> = Arc::new(Mutex::new(Instant::now()));
let reader_thread = spawn_basic_reader(reader, Arc::clone(&stop), Arc::clone(&last_output));
if let Some(sig) = exit_signal {
let _ = std::fs::remove_file(sig);
}
let mut pty_writer: Option<Box<dyn Write + Send>> = master.take_writer().ok();
let poll_interval = Duration::from_millis(100);
let deadline = if timeout_secs > 0 {
Some(Instant::now() + Duration::from_secs(timeout_secs))
} else {
None
};
let mut idle_state = IdleState::from_config(idle);
let mut exit_sent = false;
let exit_code = loop {
match child.try_wait() {
Ok(Some(status)) => break status.exit_code(),
Ok(None) => {
if let Some(dl) = deadline {
if Instant::now() >= dl {
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGKILL);
}
}
}
let _ = killer.kill(); thread::sleep(Duration::from_millis(500));
join_reader(reader_thread, &stop, master);
return Err(PtyError::Timeout(timeout_secs));
}
}
if !exit_sent {
if let Some(sig) = exit_signal {
if sig.exists() {
let _ = std::fs::remove_file(sig);
eprintln!("heartbeat-launch: exit signal detected, terminating child");
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGTERM);
}
}
}
let term_deadline = Instant::now() + Duration::from_secs(2);
while Instant::now() < term_deadline {
if let Ok(Some(_)) = child.try_wait() {
break;
}
thread::sleep(Duration::from_millis(50));
}
if child.try_wait().map(|s| s.is_none()).unwrap_or(true) {
eprintln!(
"heartbeat-launch: child did not exit on SIGTERM, sending SIGKILL"
);
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGKILL);
}
}
}
let _ = killer.kill();
}
exit_sent = true;
}
}
}
if let IdleTick::Exhausted =
tick_idle(&mut idle_state, &last_output, &mut pty_writer, false)
{
eprintln!(
"heartbeat-launch: idle timeout fired {} time(s) without recovery — killing child",
idle_state.retry_count
);
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGKILL);
}
}
}
let _ = killer.kill();
thread::sleep(Duration::from_millis(500));
join_reader(reader_thread, &stop, master);
return Err(PtyError::IdleExhausted(idle_state.timeout));
}
thread::sleep(poll_interval);
}
Err(e) => return Err(PtyError::Io(e)),
}
};
join_reader(reader_thread, &stop, master);
Ok(RunResult { exit_code })
}
#[derive(Debug, Clone)]
pub struct QueueConfig {
pub queue_path: std::path::PathBuf,
pub sentinel: String,
pub boot_delay_secs: u64,
pub entry_template: String,
pub done_message: String,
}
#[derive(Debug, Clone, PartialEq)]
enum QueueState {
WaitingForBoot,
WaitForSentinel,
InjectNext,
SendExit,
Done,
}
#[cfg(feature = "launch")]
pub fn run_with_queue(
argv: &[String],
cwd: &Path,
timeout_secs: u64,
exit_signal: Option<&Path>,
idle: Option<&IdleConfig>,
queue: &QueueConfig,
) -> Result<RunResult, PtyError> {
use std::fs;
let queue_raw = fs::read_to_string(&queue.queue_path).map_err(|e| {
PtyError::Io(io::Error::new(
io::ErrorKind::NotFound,
format!("queue file {}: {e}", queue.queue_path.display()),
))
})?;
let entries: Vec<String> = queue_raw
.lines()
.map(|l| l.trim())
.filter(|l| !l.is_empty())
.map(|l| l.to_string())
.collect();
let total = entries.len();
eprintln!(
"heartbeat-launch: queue mode — {total} entries from {}",
queue.queue_path.display()
);
let PtySpawn {
master,
mut child,
mut killer,
reader,
} = spawn_pty_child(argv, cwd)?;
let stop = Arc::new(Mutex::new(false));
let last_output: Arc<Mutex<Instant>> = Arc::new(Mutex::new(Instant::now()));
let sentinel_buf: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::with_capacity(8192)));
let seen_output: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let reader_thread = spawn_sentinel_reader(
reader,
Arc::clone(&stop),
Arc::clone(&last_output),
Arc::clone(&seen_output),
Arc::clone(&sentinel_buf),
queue.sentinel.len(),
);
if let Some(sig) = exit_signal {
let _ = fs::remove_file(sig);
}
let mut pty_writer: Option<Box<dyn Write + Send>> = master.take_writer().ok();
let poll_interval = Duration::from_millis(100);
let deadline = if timeout_secs > 0 {
Some(Instant::now() + Duration::from_secs(timeout_secs))
} else {
None
};
let mut idle_state = IdleState::from_config(idle);
let mut exit_sent = false;
let mut queue_state = QueueState::WaitingForBoot;
let mut queue_index: usize = 0;
let mut sentinel_matched_at: Option<Instant> = None;
let boot_delay = Duration::from_secs(queue.boot_delay_secs);
let exit_code = loop {
match child.try_wait() {
Ok(Some(status)) => break status.exit_code(),
Ok(None) => {
if let Some(dl) = deadline {
if Instant::now() >= dl {
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGKILL);
}
}
}
let _ = killer.kill();
thread::sleep(Duration::from_millis(500));
join_reader(reader_thread, &stop, master);
return Err(PtyError::Timeout(timeout_secs));
}
}
if !exit_sent {
if let Some(sig) = exit_signal {
if sig.exists() {
let _ = std::fs::remove_file(sig);
eprintln!("heartbeat-launch: exit signal detected, terminating child");
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGTERM);
}
}
}
let term_deadline = Instant::now() + Duration::from_secs(2);
while Instant::now() < term_deadline {
if let Ok(Some(_)) = child.try_wait() {
break;
}
thread::sleep(Duration::from_millis(50));
}
if child.try_wait().map(|s| s.is_none()).unwrap_or(true) {
eprintln!(
"heartbeat-launch: child did not exit on SIGTERM, sending SIGKILL"
);
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGKILL);
}
}
}
let _ = killer.kill();
}
exit_sent = true;
}
}
}
let skip_idle = queue_state == QueueState::WaitingForBoot;
if let IdleTick::Exhausted =
tick_idle(&mut idle_state, &last_output, &mut pty_writer, skip_idle)
{
eprintln!(
"heartbeat-launch: idle timeout fired {} time(s) without recovery — killing child",
idle_state.retry_count
);
#[cfg(unix)]
{
if let Some(pgid) = master.process_group_leader() {
unsafe {
libc::killpg(pgid, libc::SIGKILL);
}
}
}
let _ = killer.kill();
thread::sleep(Duration::from_millis(500));
join_reader(reader_thread, &stop, master);
return Err(PtyError::IdleExhausted(idle_state.timeout));
}
match queue_state {
QueueState::WaitingForBoot => {
let has_output = seen_output.lock().map(|g| *g).unwrap_or(false);
if has_output {
let silent = last_output
.lock()
.map(|ts| ts.elapsed())
.unwrap_or(Duration::ZERO);
if silent >= boot_delay {
eprintln!(
"heartbeat-launch: boot complete ({:.1}s silence) — ready to inject",
silent.as_secs_f64()
);
queue_state = QueueState::InjectNext;
}
}
if queue_state == QueueState::WaitingForBoot {
thread::sleep(poll_interval);
continue;
}
}
QueueState::WaitForSentinel => {
let found = {
let buf = sentinel_buf.lock().unwrap_or_else(|e| e.into_inner());
let text = String::from_utf8_lossy(&buf);
text.contains(queue.sentinel.as_str())
};
if found {
if let Ok(mut buf) = sentinel_buf.lock() {
buf.clear();
}
eprintln!(
"heartbeat-launch: sentinel {:?} detected after entry {}",
queue.sentinel, queue_index,
);
sentinel_matched_at = Some(Instant::now());
queue_index += 1;
queue_state = QueueState::InjectNext;
} else {
thread::sleep(poll_interval);
continue;
}
}
QueueState::InjectNext => {}
QueueState::SendExit | QueueState::Done => {}
}
match queue_state {
QueueState::InjectNext => {
if let Some(matched_at) = sentinel_matched_at {
let elapsed = matched_at.elapsed();
let delay = Duration::from_millis(500);
if elapsed < delay {
thread::sleep(delay - elapsed);
}
sentinel_matched_at = None;
}
if queue_index < total {
let entry = &entries[queue_index];
let msg = queue
.entry_template
.replace("{index}", &(queue_index + 1).to_string())
.replace("{total}", &total.to_string())
.replace("{content}", entry);
eprintln!(
"heartbeat-launch: injecting entry {}/{total}",
queue_index + 1,
);
if let Some(ref mut w) = pty_writer {
let _ = w.write_all(msg.as_bytes());
let _ = w.write_all(b"\n");
let _ = w.flush();
}
queue_state = QueueState::WaitForSentinel;
} else {
queue_state = QueueState::SendExit;
}
}
QueueState::SendExit => {
eprintln!(
"heartbeat-launch: all {total} entries consumed — sending exit prompt"
);
if let Some(ref mut w) = pty_writer {
let _ = w.write_all(queue.done_message.as_bytes());
let _ = w.write_all(b"\n");
let _ = w.flush();
}
queue_state = QueueState::Done;
}
_ => {}
}
thread::sleep(poll_interval);
}
Err(e) => return Err(PtyError::Io(e)),
}
};
join_reader(reader_thread, &stop, master);
Ok(RunResult { exit_code })
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn tmp() -> PathBuf {
std::env::temp_dir()
}
#[cfg(unix)]
#[test]
fn echo_hello_exit_zero() {
let result = run(
&["echo".to_string(), "hello".to_string()],
&tmp(),
10,
None,
None,
)
.expect("run should succeed");
assert_eq!(result.exit_code, 0, "echo should exit 0");
}
#[cfg(unix)]
#[test]
fn nonzero_exit_code_propagated() {
let result =
run(&["false".to_string()], &tmp(), 10, None, None).expect("run should succeed");
assert_ne!(result.exit_code, 0, "false should exit non-zero");
}
#[cfg(unix)]
#[test]
fn timeout_fires() {
let err = run(
&["sleep".to_string(), "60".to_string()],
&tmp(),
1,
None,
None,
)
.expect_err("should time out");
match err {
PtyError::Timeout(secs) => assert_eq!(secs, 1),
other => panic!("expected Timeout, got {other:?}"),
}
}
#[cfg(unix)]
#[test]
fn exit_signal_triggers_child_exit() {
use std::fs;
let signal_path = tmp().join("test-exit-signal-trigger.tmp");
let _ = fs::remove_file(&signal_path);
let signal_path_clone = signal_path.clone();
let writer_thread = thread::spawn(move || {
thread::sleep(Duration::from_millis(300));
fs::write(&signal_path_clone, b"").expect("write signal file");
});
let result = run(
&["sh".to_string(), "-c".to_string(), "read line".to_string()],
&tmp(),
10, Some(&signal_path),
None,
)
.expect("run should succeed");
writer_thread.join().expect("writer thread panicked");
assert_ne!(
result.exit_code, 0,
"child should exit with signal-death code after SIGTERM"
);
assert!(
!signal_path.exists(),
"signal file should be deleted after SIGTERM is sent"
);
}
#[cfg(unix)]
#[test]
fn stale_signal_file_deleted_before_poll() {
use std::fs;
let signal_path = tmp().join("test-exit-signal-stale.tmp");
fs::write(&signal_path, b"").expect("write stale signal file");
assert!(
signal_path.exists(),
"precondition: stale file should exist"
);
let result = run(
&["echo".to_string(), "hello".to_string()],
&tmp(),
10,
Some(&signal_path),
None,
)
.expect("run should succeed");
assert_eq!(result.exit_code, 0);
assert!(
!signal_path.exists(),
"stale signal file should be deleted by run() before poll loop"
);
}
}