use std::collections::VecDeque;
use std::os::fd::{AsRawFd, BorrowedFd, OwnedFd};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Condvar, LazyLock, Mutex};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, bail};
use nix::poll::{PollFd, PollFlags, poll};
use nix::pty::{OpenptyResult, openpty};
use nix::sys::signal::Signal;
use nix::unistd::{ForkResult, Pid, close, dup2, execvp, fork, setsid};
use regex::Regex;
static ANSI_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\x1b\[[0-9;]*[A-Za-z]|\x1b\[K|\x1b\[2K").unwrap());
pub enum PtyEvent {
Data(Vec<u8>),
Prompt,
Exit,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum EventKind {
Output,
Prompt,
Exit,
Stop,
Stdout,
}
impl EventKind {
pub fn as_str(self) -> &'static str {
match self {
EventKind::Output => "output",
EventKind::Prompt => "prompt",
EventKind::Exit => "exit",
EventKind::Stop => "stop",
EventKind::Stdout => "stdout",
}
}
pub fn parse(s: &str) -> Option<Self> {
match s {
"output" => Some(EventKind::Output),
"prompt" => Some(EventKind::Prompt),
"exit" => Some(EventKind::Exit),
"stop" => Some(EventKind::Stop),
"stdout" => Some(EventKind::Stdout),
_ => None,
}
}
}
#[derive(Clone, Debug)]
pub struct EventEntry {
pub seq: u64,
pub ts_ms: u64,
pub kind: EventKind,
pub bytes: Vec<u8>,
}
const MAX_EVENTS: usize = 2048;
struct EventLog {
entries: VecDeque<EventEntry>,
last_seq: u64,
started: Instant,
}
impl EventLog {
fn new() -> Self {
Self {
entries: VecDeque::with_capacity(MAX_EVENTS),
last_seq: 0,
started: Instant::now(),
}
}
fn push(&mut self, kind: EventKind, bytes: Vec<u8>) {
self.last_seq += 1;
let ts_ms = self.started.elapsed().as_millis() as u64;
if self.entries.len() == MAX_EVENTS {
self.entries.pop_front();
}
self.entries.push_back(EventEntry {
seq: self.last_seq,
ts_ms,
kind,
bytes,
});
}
fn since(&self, since: u64) -> Vec<EventEntry> {
self.entries
.iter()
.filter(|e| e.seq > since)
.cloned()
.collect()
}
}
#[derive(Clone)]
pub struct LogHandle(Arc<(Mutex<EventLog>, Condvar)>);
impl LogHandle {
pub fn new() -> Self {
Self(Arc::new((Mutex::new(EventLog::new()), Condvar::new())))
}
pub fn push(&self, kind: EventKind, bytes: Vec<u8>) {
let (lock, cvar) = &*self.0;
lock.lock().unwrap().push(kind, bytes);
cvar.notify_all();
}
pub fn since(&self, since: u64) -> Vec<EventEntry> {
self.0.0.lock().unwrap().since(since)
}
pub fn last_seq(&self) -> u64 {
self.0.0.lock().unwrap().last_seq
}
pub fn since_wait(&self, since: u64, timeout: Duration) -> Vec<EventEntry> {
let (lock, cvar) = &*self.0;
let guard = lock.lock().unwrap();
let (guard, _result) = cvar
.wait_timeout_while(guard, timeout, |log| log.last_seq <= since)
.unwrap();
guard.since(since)
}
}
pub trait DebuggerIo: Send + Sync {
fn send_and_wait(&self, cmd: &str, timeout: Duration) -> Result<String>;
fn drain_pending(&self) -> Option<String>;
fn wait_for_prompt(&self, timeout: Duration) -> Result<String>;
fn log(&self) -> LogHandle;
fn child_pid(&self) -> Pid;
fn is_alive(&self) -> bool;
fn quit(&self, quit_cmd: &str);
fn pending_hit(&self) -> Option<crate::backend::canonical::HitEvent> {
None
}
fn dispatch_structured(
&self,
_req: &crate::backend::canonical::CanonicalReq,
_timeout: Duration,
) -> Option<Result<String>> {
None
}
}
pub struct DebuggerProcess {
master: OwnedFd,
child_pid: Pid,
rx: Mutex<Receiver<PtyEvent>>,
log: LogHandle,
shutdown: Arc<AtomicBool>,
reader: Option<JoinHandle<()>>,
prompt_re: Regex,
}
impl DebuggerProcess {
pub fn spawn(
bin: &str,
args: &[String],
env_extra: &[(String, String)],
prompt_pattern: &str,
) -> Result<Self> {
let OpenptyResult { master, slave } = openpty(None, None)?;
let fork_result = unsafe { fork() }?;
match fork_result {
ForkResult::Child => {
drop(master);
setsid().ok();
let slave_fd = slave.as_raw_fd();
dup2(slave_fd, 0).ok();
dup2(slave_fd, 1).ok();
dup2(slave_fd, 2).ok();
if slave_fd > 2 {
close(slave_fd).ok();
}
unsafe {
for (k, v) in env_extra {
std::env::set_var(k, v);
}
std::env::set_var("TERM", "dumb");
}
let c_bin =
std::ffi::CString::new(bin).unwrap_or_else(|_| std::process::exit(127));
let mut c_args = vec![c_bin.clone()];
for a in args {
c_args.push(
std::ffi::CString::new(a.as_str())
.unwrap_or_else(|_| std::process::exit(127)),
);
}
execvp(&c_bin, &c_args).ok();
std::process::exit(127);
}
ForkResult::Parent { child } => {
drop(slave);
let prompt_re =
Regex::new(prompt_pattern).context("invalid prompt pattern")?;
let reader_prompt_re = prompt_re.clone();
let master_fd = master.as_raw_fd();
let (tx, rx) = mpsc::channel::<PtyEvent>();
let shutdown = Arc::new(AtomicBool::new(false));
let reader_shutdown = shutdown.clone();
let log = LogHandle::new();
let reader_log = log.clone();
let reader = std::thread::Builder::new()
.name("dbg-pty-reader".into())
.spawn(move || {
reader_loop(
master_fd,
reader_prompt_re,
tx,
reader_shutdown,
reader_log,
)
})
.context("failed to spawn reader thread")?;
Ok(Self {
master,
child_pid: child,
rx: Mutex::new(rx),
log,
shutdown,
reader: Some(reader),
prompt_re,
})
}
}
}
fn write_master(&self, data: &[u8]) -> Result<()> {
let fd = self.master.as_raw_fd();
let mut written = 0;
while written < data.len() {
match nix::unistd::write(
unsafe { BorrowedFd::borrow_raw(fd) },
&data[written..],
) {
Ok(n) => written += n,
Err(nix::errno::Errno::EINTR) => continue,
Err(e) => return Err(e.into()),
}
}
Ok(())
}
pub fn drain_pending(&self) -> Option<String> {
let rx = self.rx.lock().unwrap();
let mut accumulated: Vec<u8> = Vec::new();
let mut saw_data = false;
loop {
match rx.try_recv() {
Ok(PtyEvent::Data(bytes)) => {
saw_data = true;
accumulated.extend(bytes);
}
Ok(PtyEvent::Prompt) => {}
Ok(PtyEvent::Exit) => break,
Err(_) => break,
}
}
if !saw_data {
return None;
}
Some(strip_ansi(&String::from_utf8_lossy(&accumulated)))
}
pub fn wait_for_prompt(&self, timeout: Duration) -> Result<String> {
let rx = self.rx.lock().unwrap();
let mut collected: Vec<u8> = Vec::new();
let deadline = Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
bail!("timeout waiting for initial prompt");
}
match rx.recv_timeout(remaining) {
Ok(PtyEvent::Data(bytes)) => collected.extend(bytes),
Ok(PtyEvent::Prompt) => {
return Ok(strip_ansi(&String::from_utf8_lossy(&collected)));
}
Ok(PtyEvent::Exit) => bail!("debugger exited before producing prompt"),
Err(RecvTimeoutError::Timeout) => {
bail!("timeout waiting for initial prompt")
}
Err(RecvTimeoutError::Disconnected) => {
bail!("reader thread died before initial prompt")
}
}
}
}
pub fn send_and_wait(&self, cmd: &str, timeout: Duration) -> Result<String> {
if !self.is_alive() {
return Ok("(debuggee has exited — live inspection is over, but captured state is \
still available: `dbg hits <loc>`, `dbg stack`, `dbg locals`, `dbg cross <sym>`, \
`dbg sessions`. Start a fresh session with `dbg start` when ready.)".to_string());
}
if let Err(e) = self.write_master(format!("{cmd}\n").as_bytes()) {
if !self.is_alive() {
return Ok("(debuggee has exited — live inspection is over, but captured state is \
still available: `dbg hits <loc>`, `dbg stack`, `dbg locals`, `dbg cross <sym>`, \
`dbg sessions`. Start a fresh session with `dbg start` when ready.)".to_string());
}
return Err(e);
}
let rx = self.rx.lock().unwrap();
let mut collected: Vec<u8> = Vec::new();
let deadline = Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
bail!("timeout waiting for prompt");
}
match rx.recv_timeout(remaining) {
Ok(PtyEvent::Data(bytes)) => collected.extend(bytes),
Ok(PtyEvent::Prompt) => break,
Ok(PtyEvent::Exit) => break,
Err(RecvTimeoutError::Timeout) => bail!("timeout waiting for prompt"),
Err(RecvTimeoutError::Disconnected) => {
return Ok(
"(debuggee has exited — live inspection is over, but captured state is \
still available: `dbg hits <loc>`, `dbg stack`, `dbg locals`, `dbg cross <sym>`, \
`dbg sessions`. Start a fresh session with `dbg start` when ready.)".to_string(),
);
}
}
}
let raw = String::from_utf8_lossy(&collected).to_string();
let clean = strip_ansi(&raw);
let no_prompts = self.prompt_re.replace_all(&clean, "");
let lines: Vec<&str> = no_prompts.lines().collect();
let start = if !lines.is_empty() && lines[0].contains(cmd.trim()) {
1
} else {
0
};
let mut end = lines.len();
while end > start && lines[end - 1].trim().is_empty() {
end -= 1;
}
Ok(lines[start..end].join("\n").trim().to_string())
}
pub fn log(&self) -> LogHandle {
self.log.clone()
}
pub fn child_pid(&self) -> Pid {
self.child_pid
}
pub fn is_alive(&self) -> bool {
nix::sys::wait::waitpid(self.child_pid, Some(nix::sys::wait::WaitPidFlag::WNOHANG))
.is_ok_and(|s| matches!(s, nix::sys::wait::WaitStatus::StillAlive))
}
pub fn quit(&self, quit_cmd: &str) {
if self.is_alive() {
let _ = self.write_master(format!("{quit_cmd}\n").as_bytes());
std::thread::sleep(Duration::from_millis(500));
if self.is_alive() {
let _ = nix::sys::signal::kill(self.child_pid, Signal::SIGKILL);
}
}
}
}
impl DebuggerIo for DebuggerProcess {
fn send_and_wait(&self, cmd: &str, timeout: Duration) -> Result<String> {
DebuggerProcess::send_and_wait(self, cmd, timeout)
}
fn drain_pending(&self) -> Option<String> {
DebuggerProcess::drain_pending(self)
}
fn wait_for_prompt(&self, timeout: Duration) -> Result<String> {
DebuggerProcess::wait_for_prompt(self, timeout)
}
fn log(&self) -> LogHandle {
DebuggerProcess::log(self)
}
fn child_pid(&self) -> Pid {
DebuggerProcess::child_pid(self)
}
fn is_alive(&self) -> bool {
DebuggerProcess::is_alive(self)
}
fn quit(&self, quit_cmd: &str) {
DebuggerProcess::quit(self, quit_cmd)
}
}
fn reader_loop(
master_fd: std::os::fd::RawFd,
prompt_re: Regex,
tx: Sender<PtyEvent>,
shutdown: Arc<AtomicBool>,
log: LogHandle,
) {
let mut buf = [0u8; 4096];
let mut pending: Vec<u8> = Vec::new();
let flush_output =
|pending: &mut Vec<u8>, tx: &Sender<PtyEvent>, log: &LogHandle| -> bool {
if pending.is_empty() {
return true;
}
let bytes = std::mem::take(pending);
log.push(EventKind::Output, bytes.clone());
tx.send(PtyEvent::Data(bytes)).is_ok()
};
let emit_marker = |kind: EventKind, tx: &Sender<PtyEvent>, log: &LogHandle| -> bool {
log.push(kind, Vec::new());
let ev = match kind {
EventKind::Prompt => PtyEvent::Prompt,
EventKind::Exit => PtyEvent::Exit,
EventKind::Output | EventKind::Stop | EventKind::Stdout => {
unreachable!("emit_marker called with {kind:?}")
}
};
tx.send(ev).is_ok()
};
loop {
if shutdown.load(Ordering::Relaxed) {
return;
}
let borrowed = unsafe { BorrowedFd::borrow_raw(master_fd) };
let pollfd = PollFd::new(borrowed, PollFlags::POLLIN);
match poll(&mut [pollfd], 100u16) {
Ok(0) => continue,
Ok(_) => {}
Err(nix::errno::Errno::EINTR) => continue,
Err(_) => {
let _ = flush_output(&mut pending, &tx, &log);
let _ = emit_marker(EventKind::Exit, &tx, &log);
return;
}
}
let n = match nix::unistd::read(master_fd, &mut buf) {
Ok(0) => {
let _ = flush_output(&mut pending, &tx, &log);
let _ = emit_marker(EventKind::Exit, &tx, &log);
return;
}
Ok(n) => n,
Err(nix::errno::Errno::EINTR) => continue,
Err(_) => {
let _ = flush_output(&mut pending, &tx, &log);
let _ = emit_marker(EventKind::Exit, &tx, &log);
return;
}
};
pending.extend_from_slice(&buf[..n]);
let pending_str = String::from_utf8_lossy(&pending);
let cleaned = strip_ansi(&pending_str);
if prompt_re.is_match(&cleaned) {
if !flush_output(&mut pending, &tx, &log) {
return;
}
if !emit_marker(EventKind::Prompt, &tx, &log) {
return;
}
} else if pending.len() > 64 * 1024 {
if !flush_output(&mut pending, &tx, &log) {
return;
}
}
}
}
fn strip_ansi(s: &str) -> String {
if !s.contains('\x1b') {
return s.to_string();
}
ANSI_RE.replace_all(s, "").to_string()
}
impl Drop for DebuggerProcess {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
let _ = nix::sys::signal::kill(self.child_pid, Signal::SIGTERM);
if let Some(h) = self.reader.take() {
let _ = h.join();
}
}
}