use std::collections::HashMap;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::{
Arc, Mutex as StdMutex,
atomic::{AtomicBool, AtomicI32, Ordering},
};
use std::time::Duration;
use portable_pty::{ChildKiller, CommandBuilder, MasterPty, PtySize, native_pty_system};
use tokio::process::Command as TokioCommand;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use lash_core::{ProgressSender, SandboxMessage};
use crate::output::{
OUTPUT_QUIET_PERIOD_MS, PollOutcome, ProcessState, ShellOutputSpill, activate_spill,
clean_terminal_output, exit_status_code, kill_child, progress_chunk, render_buffer_output,
spawn_async_reader, spawn_reader_thread, spawn_wait_thread, terminate_pipe_process,
truncate_exec_output, wait_for_buffer_settle, wait_for_child_exit,
};
pub(crate) const DEFAULT_EXEC_COMMAND_TIMEOUT_MS: u64 = 10 * 60 * 1000;
const DEFAULT_PTY_SIZE: PtySize = PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
};
struct ShellProcess {
_master: Box<dyn MasterPty + Send>,
writer: Arc<StdMutex<Option<Box<dyn Write + Send>>>>,
buffer: Arc<StdMutex<Vec<u8>>>,
buffer_start: Arc<StdMutex<usize>>,
truncated: Arc<AtomicBool>,
read_cursor: Arc<StdMutex<usize>>,
exit_code: Arc<StdMutex<Option<i32>>>,
exit_notify: Arc<Notify>,
output_notify: Arc<Notify>,
spill: Arc<StdMutex<Option<ShellOutputSpill>>>,
killer: Arc<StdMutex<Option<Box<dyn ChildKiller + Send + Sync>>>>,
pid: Option<u32>,
}
pub(crate) struct PipeExecProcessRequest<'a> {
pub(crate) id: &'a str,
pub(crate) command: &'a str,
pub(crate) workdir: &'a Path,
pub(crate) login: bool,
pub(crate) shell_path: &'a str,
pub(crate) timeout: Option<Duration>,
pub(crate) progress: Option<&'a ProgressSender>,
pub(crate) max_output_tokens: Option<usize>,
pub(crate) cancel: Option<CancellationToken>,
}
#[derive(Clone, Debug)]
pub(crate) struct CommonCommandParams {
pub(crate) cmd: String,
pub(crate) workdir: PathBuf,
pub(crate) shell_path: String,
pub(crate) login: bool,
pub(crate) allow_nonzero_exit: bool,
pub(crate) max_output_tokens: Option<usize>,
}
#[derive(Clone, Debug)]
pub(crate) struct ExecCommandParams {
pub(crate) cmd: String,
pub(crate) workdir: PathBuf,
pub(crate) shell_path: String,
pub(crate) login: bool,
pub(crate) allow_nonzero_exit: bool,
pub(crate) timeout_ms: u64,
pub(crate) max_output_tokens: Option<usize>,
}
#[derive(Clone, Debug)]
pub(crate) struct StartCommandParams {
pub(crate) cmd: String,
pub(crate) workdir: PathBuf,
pub(crate) shell_path: String,
pub(crate) login: bool,
pub(crate) allow_nonzero_exit: bool,
pub(crate) max_output_tokens: Option<usize>,
}
#[derive(Clone)]
pub(crate) struct ShellRuntime {
pub(crate) shell_path: String,
cwd: PathBuf,
processes: Arc<StdMutex<HashMap<String, ShellProcess>>>,
next_session_id: Arc<AtomicI32>,
}
#[derive(Clone, Copy)]
pub(crate) struct WaitBehavior {
pub(crate) baseline_len: usize,
}
impl ShellRuntime {
pub(crate) fn new() -> Self {
let shell_path = std::env::var("SHELL").unwrap_or_else(|_| "bash".into());
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
Self {
shell_path,
cwd,
processes: Arc::new(StdMutex::new(HashMap::new())),
next_session_id: Arc::new(AtomicI32::new(1)),
}
}
pub(crate) fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
self.cwd = cwd.into();
self
}
fn shell_name(shell_path: &str) -> &str {
shell_path.rsplit('/').next().unwrap_or(shell_path)
}
pub(crate) fn resolve_workdir(&self, workdir: Option<&str>) -> PathBuf {
match workdir {
None => self.cwd.clone(),
Some(path) => {
let path = PathBuf::from(path);
if path.is_absolute() {
path
} else {
self.cwd.join(path)
}
}
}
}
fn command_for_spawn(&self, command: &str, shell_path: &str, pty: bool) -> String {
let shell_name = Self::shell_name(shell_path);
let echo_off = if pty {
"stty -echo 2>/dev/null || true\n"
} else {
""
};
let pipefail_prefix = if command.contains('|') && shell_supports_pipefail(shell_name) {
"set -o pipefail\n"
} else {
""
};
format!("{echo_off}{pipefail_prefix}{command}")
}
fn shell_args(
&self,
command: &str,
login: bool,
shell_path: &str,
pty: bool,
) -> Result<Vec<String>, String> {
let command = self.command_for_spawn(command, shell_path, pty);
if login {
if !shell_supports_login(Self::shell_name(shell_path)) {
return Err(format!(
"Login shell mode is not supported for {}",
Self::shell_name(shell_path)
));
}
Ok(vec!["-l".to_string(), "-c".to_string(), command])
} else {
Ok(vec!["-c".to_string(), command])
}
}
pub(crate) fn spawn_process(
&self,
id: String,
command: &str,
workdir: &Path,
login: bool,
shell_path: &str,
) -> Result<(), String> {
let pty_system = native_pty_system();
let pair = pty_system
.openpty(DEFAULT_PTY_SIZE)
.map_err(|err| format!("Failed to open PTY: {err}"))?;
let mut cmd = CommandBuilder::new(shell_path);
for arg in self.shell_args(command, login, shell_path, true)? {
cmd.arg(arg);
}
cmd.cwd(workdir.as_os_str());
let child = pair.slave.spawn_command(cmd).map_err(|err| {
format!(
"Failed to spawn PTY command with shell `{}` in `{}`: {err}",
shell_path,
workdir.display()
)
})?;
let killer = child.clone_killer();
let pid = child.process_id();
let reader = pair
.master
.try_clone_reader()
.map_err(|err| format!("Failed to clone PTY reader: {err}"))?;
let writer = pair
.master
.take_writer()
.map_err(|err| format!("Failed to take PTY writer: {err}"))?;
drop(pair.slave);
let buffer = Arc::new(StdMutex::new(Vec::new()));
let buffer_start = Arc::new(StdMutex::new(0usize));
let truncated = Arc::new(AtomicBool::new(false));
let read_cursor = Arc::new(StdMutex::new(0usize));
let exit_code = Arc::new(StdMutex::new(None));
let exit_notify = Arc::new(Notify::new());
let output_notify = Arc::new(Notify::new());
let spill = Arc::new(StdMutex::new(None));
let killer = Arc::new(StdMutex::new(Some(killer)));
spawn_reader_thread(
id.clone(),
reader,
Arc::clone(&buffer),
Arc::clone(&buffer_start),
Arc::clone(&truncated),
Arc::clone(&spill),
Arc::clone(&output_notify),
);
spawn_wait_thread(
child,
Arc::clone(&exit_code),
Arc::clone(&exit_notify),
Arc::clone(&output_notify),
);
let process = ShellProcess {
_master: pair.master,
writer: Arc::new(StdMutex::new(Some(writer))),
buffer,
buffer_start,
truncated,
read_cursor,
exit_code,
exit_notify,
output_notify,
spill,
killer,
pid,
};
self.processes.lock().unwrap().insert(id, process);
Ok(())
}
pub(crate) fn allocate_handle_id(&self) -> String {
self.next_session_id
.fetch_add(1, Ordering::SeqCst)
.to_string()
}
fn process_state(&self, id: &str) -> Result<ProcessState, String> {
let procs = self.processes.lock().unwrap();
let proc = procs
.get(id)
.ok_or_else(|| format!("No process with id: {id}"))?;
Ok(ProcessState {
buffer: Arc::clone(&proc.buffer),
buffer_start: Arc::clone(&proc.buffer_start),
exit_code: Arc::clone(&proc.exit_code),
exit_notify: Arc::clone(&proc.exit_notify),
output_notify: Arc::clone(&proc.output_notify),
killer: Arc::clone(&proc.killer),
pid: proc.pid,
})
}
fn take_incremental_output(
&self,
id: &str,
max_output_tokens: Option<usize>,
) -> Result<(String, Option<usize>, Option<PathBuf>), String> {
let (buffer, buffer_start, truncated, read_cursor, spill) = {
let procs = self.processes.lock().unwrap();
let proc = procs
.get(id)
.ok_or_else(|| format!("Unknown session id {id}"))?;
(
Arc::clone(&proc.buffer),
Arc::clone(&proc.buffer_start),
Arc::clone(&proc.truncated),
Arc::clone(&proc.read_cursor),
Arc::clone(&proc.spill),
)
};
let buf = buffer.lock().unwrap();
let start_offset = *buffer_start.lock().unwrap();
let end_offset = start_offset + buf.len();
let mut cursor = read_cursor.lock().unwrap();
let had_gap = *cursor < start_offset;
let start = (*cursor).max(start_offset);
let mut rendered =
String::from_utf8_lossy(&buf[start.saturating_sub(start_offset)..]).to_string();
*cursor = end_offset;
if !rendered.is_empty()
&& (had_gap || truncated.load(Ordering::SeqCst) && *cursor == end_offset)
{
if !rendered.ends_with('\n') {
rendered.push('\n');
}
rendered.push_str("[truncated]");
}
let rendered = clean_terminal_output(&rendered);
let (rendered, original_token_count, token_truncated) =
truncate_exec_output(rendered, max_output_tokens);
let mut spill_guard = spill.lock().unwrap();
let mut full_output_path = spill_guard.as_ref().map(|spill| spill.path.clone());
if token_truncated && full_output_path.is_none() {
full_output_path = activate_spill(id, &buf, &mut spill_guard);
}
Ok((rendered, original_token_count, full_output_path))
}
pub(crate) async fn wait_until_exit_or_timeout(
&self,
id: &str,
timeout: Option<Duration>,
progress: Option<&ProgressSender>,
max_output_tokens: Option<usize>,
behavior: WaitBehavior,
cancel: Option<CancellationToken>,
) -> Result<PollOutcome, String> {
let state = self.process_state(id)?;
let deadline = timeout.map(|value| tokio::time::Instant::now() + value);
let mut sent_len = behavior.baseline_len;
loop {
if let Some(token) = cancel.as_ref()
&& token.is_cancelled()
{
kill_child(&state);
wait_for_child_exit(&state, Duration::from_millis(500)).await;
return Ok(PollOutcome::Cancelled);
}
if let Some(tx) = progress {
let new_chunk = {
let buf = state.buffer.lock().unwrap();
let buffer_start = *state.buffer_start.lock().unwrap();
let buffer_end = buffer_start + buf.len();
if buffer_end > sent_len {
let start = sent_len.max(buffer_start);
let mut chunk =
String::from_utf8_lossy(&buf[start.saturating_sub(buffer_start)..])
.to_string();
if sent_len < buffer_start && !chunk.is_empty() {
if !chunk.ends_with('\n') {
chunk.push('\n');
}
chunk.push_str("[truncated]");
}
sent_len = buffer_end;
Some(clean_terminal_output(&chunk))
} else {
None
}
};
if let Some(chunk) = new_chunk
&& !chunk.is_empty()
{
let _ = tx.send(SandboxMessage {
text: chunk,
kind: "tool_output".into(),
});
}
}
let exited = state.exit_code.lock().unwrap().is_some();
if exited {
wait_for_buffer_settle(&state, Duration::from_millis(OUTPUT_QUIET_PERIOD_MS)).await;
let (output, original_token_count, full_output_path) =
self.take_incremental_output(id, max_output_tokens)?;
let exit_code = state.exit_code.lock().unwrap().unwrap_or(-1);
return Ok(PollOutcome::Exited {
output,
original_token_count,
exit_code,
full_output_path,
});
}
if let Some(dl) = deadline
&& tokio::time::Instant::now() >= dl
{
let exit_code = *state.exit_code.lock().unwrap();
if let Some(exit_code) = exit_code {
wait_for_buffer_settle(&state, Duration::from_millis(OUTPUT_QUIET_PERIOD_MS))
.await;
let (output, original_token_count, full_output_path) =
self.take_incremental_output(id, max_output_tokens)?;
return Ok(PollOutcome::Exited {
output,
original_token_count,
exit_code,
full_output_path,
});
}
let (output, original_token_count, full_output_path) =
self.take_incremental_output(id, max_output_tokens)?;
return Ok(PollOutcome::Running {
output,
original_token_count,
full_output_path,
});
}
let cancel_future = async {
match cancel.as_ref() {
Some(token) => token.cancelled().await,
None => std::future::pending::<()>().await,
}
};
if let Some(wake_at) = deadline {
tokio::select! {
_ = state.exit_notify.notified() => {}
_ = state.output_notify.notified() => {}
_ = tokio::time::sleep_until(wake_at) => {}
_ = cancel_future => {}
}
} else {
tokio::select! {
_ = state.exit_notify.notified() => {}
_ = state.output_notify.notified() => {}
_ = cancel_future => {}
}
}
}
}
pub(crate) fn remove_process(&self, id: &str) {
if let Some(proc) = self.processes.lock().unwrap().remove(id)
&& let Some(mut spill) = proc.spill.lock().unwrap().take()
{
let _ = spill.file.flush();
}
}
pub(crate) async fn write_stdin(&self, id: &str, input: &str) -> Result<(), String> {
let writer = {
let procs = self.processes.lock().unwrap();
let proc = procs
.get(id)
.ok_or_else(|| format!("Unknown session id {id}"))?;
Arc::clone(&proc.writer)
};
let input = input.to_string();
tokio::task::spawn_blocking(move || {
let mut writer = writer.lock().unwrap();
let writer = writer
.as_mut()
.ok_or_else(|| "Process stdin not available".to_string())?;
writer
.write_all(input.as_bytes())
.map_err(|err| format!("Write failed: {err}"))?;
writer.flush().map_err(|err| format!("Flush failed: {err}"))
})
.await
.map_err(|err| format!("Write task failed: {err}"))?
}
pub(crate) async fn close_stdin(&self, id: &str) -> Result<(), String> {
let writer = {
let procs = self.processes.lock().unwrap();
let proc = procs
.get(id)
.ok_or_else(|| format!("Unknown session id {id}"))?;
Arc::clone(&proc.writer)
};
tokio::task::spawn_blocking(move || {
let mut writer = writer.lock().unwrap();
writer.take();
Ok(())
})
.await
.map_err(|err| format!("Close stdin task failed: {err}"))?
}
pub(crate) async fn exec_pipe_process(
&self,
request: PipeExecProcessRequest<'_>,
) -> Result<PollOutcome, String> {
let PipeExecProcessRequest {
id,
command,
workdir,
login,
shell_path,
timeout,
progress,
max_output_tokens,
cancel,
} = request;
let mut cmd = TokioCommand::new(shell_path);
for arg in self.shell_args(command, login, shell_path, false)? {
cmd.arg(arg);
}
cmd.current_dir(workdir)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
#[cfg(unix)]
unsafe {
cmd.pre_exec(|| {
if libc::setsid() == -1 {
return Err(std::io::Error::last_os_error());
}
Ok(())
});
}
let mut child = cmd.spawn().map_err(|err| {
format!(
"Failed to spawn command with shell `{}` in `{}`: {err}",
shell_path,
workdir.display()
)
})?;
let child_pid = child.id();
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let buffer = Arc::new(StdMutex::new(Vec::new()));
let buffer_start = Arc::new(StdMutex::new(0usize));
let truncated = Arc::new(AtomicBool::new(false));
let spill = Arc::new(StdMutex::new(None));
let output_notify = Arc::new(Notify::new());
let mut reader_handles = Vec::new();
if let Some(stdout) = stdout {
reader_handles.push(spawn_async_reader(
id.to_string(),
stdout,
Arc::clone(&buffer),
Arc::clone(&buffer_start),
Arc::clone(&truncated),
Arc::clone(&spill),
Arc::clone(&output_notify),
));
}
if let Some(stderr) = stderr {
reader_handles.push(spawn_async_reader(
id.to_string(),
stderr,
Arc::clone(&buffer),
Arc::clone(&buffer_start),
Arc::clone(&truncated),
Arc::clone(&spill),
Arc::clone(&output_notify),
));
}
let deadline = timeout.map(|value| tokio::time::Instant::now() + value);
let wait_handle = tokio::spawn(async move { child.wait().await });
tokio::pin!(wait_handle);
let mut sent_len = 0usize;
loop {
if let Some(token) = cancel.as_ref()
&& token.is_cancelled()
{
terminate_pipe_process(child_pid);
let _ = tokio::time::timeout(Duration::from_millis(500), &mut wait_handle).await;
wait_for_pipe_readers(&mut reader_handles).await;
return Ok(PollOutcome::Cancelled);
}
if let Some(tx) = progress
&& let Some(chunk) = progress_chunk(&buffer, &buffer_start, &mut sent_len)
&& !chunk.is_empty()
{
let _ = tx.send(SandboxMessage {
text: chunk,
kind: "tool_output".into(),
});
}
if let Some(dl) = deadline
&& tokio::time::Instant::now() >= dl
{
terminate_pipe_process(child_pid);
let _ = tokio::time::timeout(Duration::from_millis(500), &mut wait_handle).await;
wait_for_pipe_readers(&mut reader_handles).await;
let (output, original_token_count, full_output_path) = render_buffer_output(
id,
&buffer,
&buffer_start,
Arc::as_ref(&truncated),
&spill,
max_output_tokens,
);
return Ok(PollOutcome::Running {
output,
original_token_count,
full_output_path,
});
}
let cancel_future = async {
match cancel.as_ref() {
Some(token) => token.cancelled().await,
None => std::future::pending::<()>().await,
}
};
if let Some(wake_at) = deadline {
tokio::select! {
status = &mut wait_handle => {
let exit_code = status
.map_err(|err| format!("Wait task failed: {err}"))?
.map(exit_status_code)
.unwrap_or(-1);
wait_for_pipe_readers(&mut reader_handles).await;
let (output, original_token_count, full_output_path) = render_buffer_output(
id,
&buffer,
&buffer_start,
Arc::as_ref(&truncated),
&spill,
max_output_tokens,
);
return Ok(PollOutcome::Exited {
output,
original_token_count,
exit_code,
full_output_path,
});
}
_ = output_notify.notified() => {}
_ = tokio::time::sleep_until(wake_at) => {}
_ = cancel_future => {}
}
} else {
tokio::select! {
status = &mut wait_handle => {
let exit_code = status
.map_err(|err| format!("Wait task failed: {err}"))?
.map(exit_status_code)
.unwrap_or(-1);
wait_for_pipe_readers(&mut reader_handles).await;
let (output, original_token_count, full_output_path) = render_buffer_output(
id,
&buffer,
&buffer_start,
Arc::as_ref(&truncated),
&spill,
max_output_tokens,
);
return Ok(PollOutcome::Exited {
output,
original_token_count,
exit_code,
full_output_path,
});
}
_ = output_notify.notified() => {}
_ = cancel_future => {}
}
}
}
}
}
async fn wait_for_pipe_readers(handles: &mut Vec<tokio::task::JoinHandle<()>>) {
for handle in handles.drain(..) {
let _ = tokio::time::timeout(Duration::from_millis(500), handle).await;
}
}
fn shell_supports_pipefail(shell_name: &str) -> bool {
matches!(shell_name, "bash" | "zsh" | "ksh" | "mksh")
}
fn shell_supports_login(shell_name: &str) -> bool {
matches!(shell_name, "bash" | "zsh" | "ksh" | "mksh" | "fish")
}