use anda_core::{BoxError, StateFeatures, ToolOutput};
use async_trait::async_trait;
use ic_auth_types::Xid;
use serde_json::json;
use std::{
borrow::Cow,
collections::{BTreeSet, HashMap},
path::PathBuf,
process::{ExitStatus, Output, Stdio},
};
use tokio::{
io::{AsyncRead, AsyncReadExt},
process::Command,
sync::Mutex as TokioMutex,
};
use super::{ExecArgs, ExecOutput, Executor, ShellToolHook};
use crate::{
context::BaseCtx,
hook::{DynToolJsonHook, ToolBackgroundHook, ToolHook},
};
#[cfg(not(test))]
const BACKGROUND_PROGRESS_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
#[cfg(test)]
const BACKGROUND_PROGRESS_INTERVAL: std::time::Duration = std::time::Duration::from_millis(50);
const OUTPUT_READ_CHUNK_BYTES: usize = 8192;
type OutputBuffer = std::sync::Arc<TokioMutex<Vec<u8>>>;
pub struct NativeRuntime {
workspace: PathBuf,
temp_dir: PathBuf,
insecure: bool,
background_progress_interval: std::time::Duration,
}
impl NativeRuntime {
pub fn build_shell_command(command: &str) -> std::process::Command {
#[cfg(not(target_os = "windows"))]
{
let mut process = std::process::Command::new("sh");
process.arg("-c").arg(command);
process
}
#[cfg(target_os = "windows")]
{
let mut process = std::process::Command::new("cmd.exe");
process.arg("/C").arg(command);
process
}
}
pub fn new(workspace: PathBuf) -> Self {
Self {
workspace,
temp_dir: std::env::temp_dir(),
insecure: false,
background_progress_interval: BACKGROUND_PROGRESS_INTERVAL,
}
}
pub fn temp_dir(self, temp_dir: PathBuf) -> Self {
Self { temp_dir, ..self }
}
pub fn insecure(self) -> Self {
Self {
insecure: true,
..self
}
}
pub fn background_progress_interval(self, interval: std::time::Duration) -> Self {
Self {
background_progress_interval: interval,
..self
}
}
pub async fn execute_command(
&self,
ctx: BaseCtx,
tool_name: &str,
command: std::process::Command,
envs: HashMap<String, String>,
args: Option<ExecArgs>,
) -> Result<ExecOutput, BoxError> {
let args = args.unwrap_or_default();
let hook = ctx.get_state::<ShellToolHook>();
let workspace = ctx
.meta()
.get_extra_as::<String>("workspace")
.map(PathBuf::from)
.map(Cow::Owned)
.unwrap_or_else(|| Cow::Borrowed(&self.workspace));
let workspace_str = workspace.to_string_lossy().to_string();
let mut cmd = Command::from(command);
if !self.insecure {
cmd.env_clear();
}
cmd.envs(envs);
cmd.current_dir(workspace.as_ref());
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.kill_on_drop(true);
let child = match cmd.spawn() {
Ok(child) => child,
Err(err) => {
return Ok(ExecOutput {
workspace: Some(workspace_str),
stderr: Some(format!("Failed to spawn process: {err}")),
..Default::default()
});
}
};
let pid = child.id();
if !args.background {
match child.wait_with_output().await {
Ok(output) => {
let mut exec_output =
ExecOutput::from_output(pid, Some(output), &self.temp_dir).await;
exec_output.workspace = Some(workspace_str);
return Ok(exec_output);
}
Err(err) => {
let exec_output = ExecOutput {
workspace: Some(workspace_str),
process_id: pid,
stderr: Some(format!("Failed to execute background process: {err}")),
..Default::default()
};
return Ok(exec_output);
}
}
}
let task_id = format!("{}:{}", tool_name, Xid::new());
let exec_output = ExecOutput::from_output(
pid,
Some(Output {
status: ExitStatus::default(),
stdout: format!("Background process started with task ID {task_id}").into_bytes(),
stderr: Vec::new(),
}),
&self.temp_dir,
)
.await;
let json_hook = ctx.get_state::<DynToolJsonHook>();
if let Some(hook) = &json_hook {
hook.on_background_start(&ctx, &task_id, json!(&args)).await;
} else if let Some(hook) = &hook {
hook.on_background_start(&ctx, &task_id, &args).await;
}
{
let temp_dir = self.temp_dir.clone();
let background_progress_interval = self.background_progress_interval;
tokio::spawn(async move {
let mut child = child;
let stdout = std::sync::Arc::new(TokioMutex::new(Vec::new()));
let stderr = std::sync::Arc::new(TokioMutex::new(Vec::new()));
let stdout_reader = spawn_output_reader(child.stdout.take(), stdout.clone());
let stderr_reader = spawn_output_reader(child.stderr.take(), stderr.clone());
let mut stdout_progress = ProgressStreamState::default();
let mut stderr_progress = ProgressStreamState::default();
let mut interval = tokio::time::interval(background_progress_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval.tick().await;
let wait = child.wait();
tokio::pin!(wait);
let status = loop {
tokio::select! {
status = &mut wait => break status,
_ = interval.tick() => {
if let Some((stdout_chunk, stderr_chunk)) = collect_progress_output(
&stdout,
&stderr,
&mut stdout_progress,
&mut stderr_progress,
).await {
let exec_output = output_chunks_to_exec_output(
pid,
&workspace_str,
stdout_chunk,
stderr_chunk,
);
emit_background_progress(
&ctx,
&task_id,
exec_output,
json_hook.as_ref(),
hook.as_ref(),
).await;
}
}
}
};
let stdout_read_error = output_reader_error(stdout_reader, "stdout").await;
let stderr_read_error = output_reader_error(stderr_reader, "stderr").await;
let stdout_bytes = std::mem::take(&mut *stdout.lock().await);
let mut stderr_bytes = std::mem::take(&mut *stderr.lock().await);
if let Some(err) = stdout_read_error {
append_output_read_error(&mut stderr_bytes, err);
}
if let Some(err) = stderr_read_error {
append_output_read_error(&mut stderr_bytes, err);
}
let exec_output = match status {
Ok(status) => {
let mut exec_output = ExecOutput::from_output(
pid,
Some(Output {
status,
stdout: stdout_bytes,
stderr: stderr_bytes,
}),
&temp_dir,
)
.await;
exec_output.workspace = Some(workspace_str);
exec_output
}
Err(err) => {
let mut error =
format!("Failed to execute background process: {err}").into_bytes();
if !stderr_bytes.is_empty() {
error.push(b'\n');
error.extend_from_slice(&stderr_bytes);
}
output_bytes_to_exec_output(pid, &workspace_str, stdout_bytes, error)
}
};
emit_background_end(
&ctx,
task_id,
exec_output,
json_hook.as_ref(),
hook.as_ref(),
)
.await;
});
}
Ok(exec_output)
}
}
#[async_trait]
impl Executor for NativeRuntime {
fn name(&self) -> &str {
"shell"
}
fn workspace(&self) -> &PathBuf {
&self.workspace
}
fn shell(&self) -> &str {
#[cfg(not(target_os = "windows"))]
{
"sh"
}
#[cfg(target_os = "windows")]
{
"cmd.exe"
}
}
async fn execute(
&self,
ctx: BaseCtx,
input: ExecArgs,
envs: HashMap<String, String>,
) -> Result<ExecOutput, BoxError> {
let cmd = Self::build_shell_command(&input.command);
self.execute_command(ctx, self.name(), cmd, envs, Some(input))
.await
}
}
#[derive(Default)]
struct ProgressStreamState {
sent_len: usize,
terminal: TerminalProgressState,
}
#[derive(Default)]
struct TerminalProgressState {
lines: Vec<String>,
cursor_row: usize,
cursor: usize,
rewrite_mode: bool,
completed_lines: Vec<String>,
dirty_rows: BTreeSet<usize>,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum ProgressMode {
Plain,
Rewrite,
}
impl ProgressStreamState {
fn next_output(&mut self, output: &[u8]) -> Option<String> {
if output.len() <= self.sent_len {
return None;
}
let unread = &output[self.sent_len..];
let readable_len = complete_utf8_prefix_len(unread);
if readable_len == 0 {
return None;
}
self.sent_len += readable_len;
let text = String::from_utf8_lossy(&unread[..readable_len]);
self.terminal.render(&text)
}
}
impl TerminalProgressState {
fn render(&mut self, text: &str) -> Option<String> {
if has_rewrite_control(text) {
self.rewrite_mode = true;
}
let mode = if self.rewrite_mode {
ProgressMode::Rewrite
} else {
ProgressMode::Plain
};
self.apply_text(text, mode);
match mode {
ProgressMode::Plain => self.take_completed_lines(),
ProgressMode::Rewrite => self.take_dirty_lines(),
}
}
fn apply_text(&mut self, text: &str, mode: ProgressMode) {
self.ensure_cursor_line();
let mut chars = text.chars().peekable();
while let Some(ch) = chars.next() {
match ch {
'\r' => self.cursor = 0,
'\n' => self.newline(mode),
'\x08' => self.move_cursor_left(),
'\x1b' => {
self.apply_escape_sequence(&mut chars);
}
_ => self.write_char(ch),
}
}
}
fn take_completed_lines(&mut self) -> Option<String> {
if self.completed_lines.is_empty() {
return None;
}
let lines = std::mem::take(&mut self.completed_lines);
let output = lines.join("\n");
(!output.is_empty()).then_some(output)
}
fn take_dirty_lines(&mut self) -> Option<String> {
if self.dirty_rows.is_empty() {
return None;
}
let rows = std::mem::take(&mut self.dirty_rows);
let lines = rows
.into_iter()
.filter_map(|row| self.lines.get(row))
.map(|line| line.trim_end_matches(' '))
.filter(|line| !line.is_empty())
.map(ToString::to_string)
.collect::<Vec<_>>();
if lines.is_empty() {
None
} else {
Some(lines.join("\n"))
}
}
fn newline(&mut self, mode: ProgressMode) {
if mode == ProgressMode::Plain {
let line = self.current_line().trim_end_matches(' ').to_string();
self.completed_lines.push(line);
} else {
self.mark_dirty();
}
self.cursor_row += 1;
self.cursor = 0;
self.ensure_cursor_line();
}
fn write_char(&mut self, ch: char) {
self.ensure_cursor_line();
if self.cursor >= self.lines[self.cursor_row].len() {
self.lines[self.cursor_row].push(ch);
self.cursor = self.lines[self.cursor_row].len();
self.mark_dirty();
return;
}
let end = self.lines[self.cursor_row][self.cursor..]
.char_indices()
.nth(1)
.map(|(idx, _)| self.cursor + idx)
.unwrap_or(self.lines[self.cursor_row].len());
let mut buf = [0; 4];
self.lines[self.cursor_row].replace_range(self.cursor..end, ch.encode_utf8(&mut buf));
self.cursor += ch.len_utf8();
self.mark_dirty();
}
fn move_cursor_left(&mut self) {
self.move_cursor_left_by(1);
}
fn move_cursor_left_by(&mut self, count: usize) {
self.ensure_cursor_line();
for _ in 0..count {
if self.cursor == 0 {
return;
}
self.cursor = self.lines[self.cursor_row][..self.cursor]
.char_indices()
.next_back()
.map(|(idx, _)| idx)
.unwrap_or(0);
}
}
fn move_cursor_right_by(&mut self, count: usize) {
self.ensure_cursor_line();
for _ in 0..count {
if self.cursor >= self.lines[self.cursor_row].len() {
return;
}
self.cursor = self.lines[self.cursor_row][self.cursor..]
.char_indices()
.nth(1)
.map(|(idx, _)| self.cursor + idx)
.unwrap_or(self.lines[self.cursor_row].len());
}
}
fn move_cursor_up_by(&mut self, count: usize) {
self.cursor_row = self.cursor_row.saturating_sub(count);
self.clamp_cursor();
}
fn move_cursor_down_by(&mut self, count: usize) {
self.cursor_row = self.cursor_row.saturating_add(count);
self.ensure_cursor_line();
self.clamp_cursor();
}
fn set_cursor_column(&mut self, column: usize) {
self.ensure_cursor_line();
let target_column = column.saturating_sub(1);
let char_count = self.lines[self.cursor_row].chars().count();
if target_column > char_count {
self.lines[self.cursor_row].extend(std::iter::repeat_n(
' ',
target_column.saturating_sub(char_count),
));
self.mark_dirty();
}
self.cursor = byte_index_for_char_column(&self.lines[self.cursor_row], target_column);
}
fn set_cursor_position(&mut self, row: usize, column: usize) {
self.cursor_row = row.saturating_sub(1);
self.ensure_cursor_line();
self.set_cursor_column(column);
}
fn clear_from_cursor(&mut self) {
self.ensure_cursor_line();
self.lines[self.cursor_row].truncate(self.cursor);
self.mark_dirty();
}
fn clear_to_cursor(&mut self) {
self.ensure_cursor_line();
if self.cursor == 0 {
return;
}
self.lines[self.cursor_row].replace_range(0..self.cursor, "");
self.cursor = 0;
self.mark_dirty();
}
fn clear_line(&mut self) {
self.ensure_cursor_line();
self.lines[self.cursor_row].clear();
self.cursor = 0;
self.mark_dirty();
}
fn clear_screen(&mut self) {
self.lines.clear();
self.cursor_row = 0;
self.cursor = 0;
self.completed_lines.clear();
self.dirty_rows.clear();
self.ensure_cursor_line();
}
fn apply_escape_sequence<I>(&mut self, chars: &mut std::iter::Peekable<I>)
where
I: Iterator<Item = char>,
{
match chars.peek() {
Some('[') => {
chars.next();
let mut params = String::new();
for ch in chars.by_ref() {
if ('@'..='~').contains(&ch) {
self.apply_csi_sequence(¶ms, ch);
break;
}
params.push(ch);
}
}
Some(']') => {
chars.next();
while let Some(ch) = chars.next() {
if ch == '\x07' {
break;
}
if ch == '\x1b' && matches!(chars.peek(), Some('\\')) {
chars.next();
break;
}
}
}
_ => {}
}
}
fn apply_csi_sequence(&mut self, params: &str, command: char) {
let values = csi_params(params);
match command {
'A' => self.move_cursor_up_by(csi_param_or(&values, 0, 1)),
'B' => self.move_cursor_down_by(csi_param_or(&values, 0, 1)),
'C' => self.move_cursor_right_by(csi_param_or(&values, 0, 1)),
'D' => self.move_cursor_left_by(csi_param_or(&values, 0, 1)),
'G' => self.set_cursor_column(csi_param_or(&values, 0, 1)),
'H' | 'f' => {
self.set_cursor_position(csi_param_or(&values, 0, 1), csi_param_or(&values, 1, 1))
}
'J' if csi_param_or(&values, 0, 0) == 2 => self.clear_screen(),
'K' => match csi_param_or(&values, 0, 0) {
0 => self.clear_from_cursor(),
1 => self.clear_to_cursor(),
2 => self.clear_line(),
_ => {}
},
_ => {}
}
}
fn ensure_cursor_line(&mut self) {
while self.lines.len() <= self.cursor_row {
self.lines.push(String::new());
}
}
fn clamp_cursor(&mut self) {
self.ensure_cursor_line();
if self.cursor > self.lines[self.cursor_row].len() {
self.cursor = self.lines[self.cursor_row].len();
}
while !self.lines[self.cursor_row].is_char_boundary(self.cursor) && self.cursor > 0 {
self.cursor -= 1;
}
}
fn current_line(&mut self) -> &str {
self.ensure_cursor_line();
&self.lines[self.cursor_row]
}
fn mark_dirty(&mut self) {
self.dirty_rows.insert(self.cursor_row);
}
}
fn spawn_output_reader<R>(
reader: Option<R>,
output: OutputBuffer,
) -> tokio::task::JoinHandle<std::io::Result<()>>
where
R: AsyncRead + Unpin + Send + 'static,
{
tokio::spawn(async move {
let Some(mut reader) = reader else {
return Ok(());
};
let mut chunk = [0; OUTPUT_READ_CHUNK_BYTES];
loop {
let len = reader.read(&mut chunk).await?;
if len == 0 {
return Ok(());
}
output.lock().await.extend_from_slice(&chunk[..len]);
}
})
}
async fn collect_progress_output(
stdout: &OutputBuffer,
stderr: &OutputBuffer,
stdout_progress: &mut ProgressStreamState,
stderr_progress: &mut ProgressStreamState,
) -> Option<(String, String)> {
let stdout_chunk = {
let stdout = stdout.lock().await;
stdout_progress.next_output(&stdout).unwrap_or_default()
};
let stderr_chunk = {
let stderr = stderr.lock().await;
stderr_progress.next_output(&stderr).unwrap_or_default()
};
if stdout_chunk.is_empty() && stderr_chunk.is_empty() {
None
} else {
Some((stdout_chunk, stderr_chunk))
}
}
fn complete_utf8_prefix_len(bytes: &[u8]) -> usize {
if bytes.is_empty() {
return 0;
}
let mut continuation_start = bytes.len();
while continuation_start > 0 && is_utf8_continuation_byte(bytes[continuation_start - 1]) {
continuation_start -= 1;
}
if continuation_start == 0 {
return bytes.len();
}
let lead_index = if continuation_start == bytes.len() {
bytes.len() - 1
} else {
continuation_start - 1
};
let required_len = utf8_sequence_len(bytes[lead_index]);
if required_len > 1 && bytes.len() - lead_index < required_len {
lead_index
} else {
bytes.len()
}
}
fn is_utf8_continuation_byte(byte: u8) -> bool {
byte & 0b1100_0000 == 0b1000_0000
}
fn utf8_sequence_len(byte: u8) -> usize {
if byte & 0b1000_0000 == 0 {
1
} else if byte & 0b1110_0000 == 0b1100_0000 {
2
} else if byte & 0b1111_0000 == 0b1110_0000 {
3
} else if byte & 0b1111_1000 == 0b1111_0000 {
4
} else {
1
}
}
fn has_rewrite_control(text: &str) -> bool {
if text.contains(['\r', '\x08']) {
return true;
}
let mut chars = text.chars().peekable();
while let Some(ch) = chars.next() {
if ch != '\x1b' || !matches!(chars.peek(), Some('[')) {
continue;
}
chars.next();
for ch in chars.by_ref() {
if !('@'..='~').contains(&ch) {
continue;
}
if matches!(ch, 'A' | 'B' | 'C' | 'D' | 'G' | 'H' | 'J' | 'K' | 'f') {
return true;
}
break;
}
}
false
}
fn byte_index_for_char_column(text: &str, column: usize) -> usize {
text.char_indices()
.nth(column)
.map(|(idx, _)| idx)
.unwrap_or(text.len())
}
fn csi_params(params: &str) -> Vec<usize> {
params
.split(';')
.filter_map(|part| {
let digits = part
.chars()
.filter(|ch| ch.is_ascii_digit())
.collect::<String>();
if digits.is_empty() {
None
} else {
digits.parse().ok()
}
})
.collect()
}
fn csi_param_or(values: &[usize], index: usize, default: usize) -> usize {
match values.get(index).copied() {
Some(0) if default != 0 => default,
Some(value) => value,
None => default,
}
}
async fn output_reader_error(
handle: tokio::task::JoinHandle<std::io::Result<()>>,
stream_name: &str,
) -> Option<String> {
match handle.await {
Ok(Ok(())) => None,
Ok(Err(err)) => Some(format!("Failed to read background {stream_name}: {err}")),
Err(err) => Some(format!(
"Failed to join background {stream_name} reader: {err}"
)),
}
}
fn append_output_read_error(stderr: &mut Vec<u8>, err: String) {
if !stderr.is_empty() && !stderr.ends_with(b"\n") {
stderr.push(b'\n');
}
stderr.extend_from_slice(err.as_bytes());
}
fn output_chunks_to_exec_output(
process_id: Option<u32>,
workspace: &str,
stdout: String,
stderr: String,
) -> ExecOutput {
ExecOutput {
workspace: Some(workspace.to_string()),
process_id,
stdout: (!stdout.is_empty()).then_some(stdout),
stderr: (!stderr.is_empty()).then_some(stderr),
..Default::default()
}
}
fn output_bytes_to_exec_output(
process_id: Option<u32>,
workspace: &str,
stdout: Vec<u8>,
stderr: Vec<u8>,
) -> ExecOutput {
output_chunks_to_exec_output(
process_id,
workspace,
String::from_utf8_lossy(&stdout).to_string(),
String::from_utf8_lossy(&stderr).to_string(),
)
}
async fn emit_background_progress(
ctx: &BaseCtx,
task_id: &str,
output: ExecOutput,
json_hook: Option<&DynToolJsonHook>,
hook: Option<&ShellToolHook>,
) {
if let Some(hook) = json_hook {
hook.on_background_progress(ctx, task_id.to_string(), ToolOutput::new(json!(output)))
.await;
return;
}
if let Some(hook) = hook {
hook.on_background_progress(ctx, task_id.to_string(), ToolOutput::new(output))
.await;
}
}
async fn emit_background_end(
ctx: &BaseCtx,
task_id: String,
output: ExecOutput,
json_hook: Option<&DynToolJsonHook>,
hook: Option<&ShellToolHook>,
) {
if let Some(hook) = json_hook {
hook.on_background_end(ctx, task_id, ToolOutput::new(json!(output)))
.await;
return;
}
if let Some(hook) = hook {
hook.on_background_end(ctx, task_id, ToolOutput::new(output))
.await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::EngineBuilder;
use std::{
path::Path,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::sync::{mpsc, oneshot};
struct TestTempDir(PathBuf);
impl TestTempDir {
async fn new(prefix: &str) -> Self {
let path =
std::env::temp_dir().join(format!("{prefix}-{:016x}", rand::random::<u64>()));
tokio::fs::create_dir_all(&path).await.unwrap();
Self(path)
}
fn path(&self) -> &Path {
&self.0
}
async fn create_dir(&self, relative: &str) -> PathBuf {
let path = self.0.join(relative);
tokio::fs::create_dir_all(&path).await.unwrap();
path
}
}
impl Drop for TestTempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
#[allow(clippy::type_complexity)]
struct TestHook {
sender: Mutex<Option<oneshot::Sender<(String, ToolOutput<ExecOutput>)>>>,
}
impl TestHook {
fn new(sender: oneshot::Sender<(String, ToolOutput<ExecOutput>)>) -> Self {
Self {
sender: Mutex::new(Some(sender)),
}
}
}
#[async_trait]
impl ToolHook<ExecArgs, ExecOutput> for TestHook {
async fn on_background_end(
&self,
_ctx: &BaseCtx,
task_id: String,
output: ToolOutput<ExecOutput>,
) {
if let Some(sender) = self.sender.lock().unwrap().take() {
let _ = sender.send((task_id, output));
}
}
}
#[allow(clippy::type_complexity)]
struct ProgressHook {
progress_sender: mpsc::UnboundedSender<(String, ToolOutput<ExecOutput>)>,
end_sender: Mutex<Option<oneshot::Sender<(String, ToolOutput<ExecOutput>)>>>,
}
impl ProgressHook {
fn new(
progress_sender: mpsc::UnboundedSender<(String, ToolOutput<ExecOutput>)>,
end_sender: oneshot::Sender<(String, ToolOutput<ExecOutput>)>,
) -> Self {
Self {
progress_sender,
end_sender: Mutex::new(Some(end_sender)),
}
}
}
#[async_trait]
impl ToolHook<ExecArgs, ExecOutput> for ProgressHook {
async fn on_background_progress(
&self,
_ctx: &BaseCtx,
task_id: String,
output: ToolOutput<ExecOutput>,
) {
let _ = self.progress_sender.send((task_id, output));
}
async fn on_background_end(
&self,
_ctx: &BaseCtx,
task_id: String,
output: ToolOutput<ExecOutput>,
) {
if let Some(sender) = self.end_sender.lock().unwrap().take() {
let _ = sender.send((task_id, output));
}
}
}
fn foreground_command(runtime: &NativeRuntime, env_name: &str, output_file: &str) -> String {
match runtime.shell() {
"cmd.exe" => format!(
"<nul set /p =%{env_name}% > {output_file} & <nul set /p =done & echo warn 1>&2"
),
_ => format!(
"printf '%s' \"${env_name}\" > {output_file}; printf '%s' 'done'; printf '%s' 'warn' >&2"
),
}
}
fn background_command(runtime: &NativeRuntime) -> String {
match runtime.shell() {
"cmd.exe" => {
"ping 127.0.0.1 -n 2 > nul & <nul set /p =bg-out & echo bg-err 1>&2".to_string()
}
_ => "sleep 0.2; printf '%s' 'bg-out'; printf '%s' 'bg-err' >&2".to_string(),
}
}
fn background_progress_command(runtime: &NativeRuntime) -> String {
match runtime.shell() {
"cmd.exe" => "echo progress-out & echo progress-err 1>&2 & ping 127.0.0.1 -n 2 > nul & <nul set /p =done".to_string(),
_ => "printf '%s\n' 'progress-out'; printf '%s\n' 'progress-err' >&2; sleep 0.5; printf '%s' 'done'".to_string(),
}
}
#[test]
fn progress_stream_waits_for_complete_utf8_sequence() {
let mut state = ProgressStreamState::default();
let mut output = vec![0xe4, 0xb8];
assert_eq!(state.next_output(&output), None);
output.push(0xad);
assert_eq!(state.next_output(&output), None);
output.push(b'\n');
assert_eq!(state.next_output(&output).as_deref(), Some("ä¸"));
}
#[test]
fn progress_stream_emits_complete_plain_lines() {
let mut state = ProgressStreamState::default();
let mut output = b"line 1\npartial".to_vec();
assert_eq!(state.next_output(&output).as_deref(), Some("line 1"));
output.extend_from_slice(b" line\n");
assert_eq!(state.next_output(&output).as_deref(), Some("partial line"));
}
#[test]
fn progress_stream_normalizes_rewritten_terminal_line() {
let mut state = ProgressStreamState::default();
assert_eq!(
state.next_output(b"10%\r20%\r100%").as_deref(),
Some("100%")
);
}
#[test]
fn progress_stream_keeps_rewrite_mode_across_ticks() {
let mut state = ProgressStreamState::default();
let mut output = b"10%\r".to_vec();
assert_eq!(state.next_output(&output).as_deref(), Some("10%"));
output.extend_from_slice(b"20%");
assert_eq!(state.next_output(&output).as_deref(), Some("20%"));
}
#[test]
fn progress_stream_keeps_colored_plain_output_line_based() {
let mut state = ProgressStreamState::default();
let mut output = b"\x1b[31mred\x1b[0m".to_vec();
assert_eq!(state.next_output(&output), None);
output.push(b'\n');
assert_eq!(state.next_output(&output).as_deref(), Some("red"));
}
#[test]
fn progress_stream_handles_ansi_clear_line() {
let mut state = ProgressStreamState::default();
assert_eq!(
state.next_output(b"abcdef\rxy\x1b[K").as_deref(),
Some("xy")
);
}
#[test]
fn progress_stream_handles_backspace_on_utf8_character() {
let mut state = ProgressStreamState::default();
assert_eq!(
state.next_output("ä¸\x08æ–‡".as_bytes()).as_deref(),
Some("æ–‡")
);
}
#[test]
fn progress_stream_reports_all_changed_visible_progress_lines() {
let mut state = ProgressStreamState::default();
assert_eq!(
state
.next_output(b"file-a 10%\nfile-b 20%\x1b[1A\rfile-a 90%\x1b[1B\rfile-b 80%")
.as_deref(),
Some("file-a 90%\nfile-b 80%")
);
}
#[test]
fn new_initializes_paths_and_shell() {
let runtime = NativeRuntime::new(PathBuf::from("/home/anda-native-runtime-tests"));
assert_eq!(runtime.name(), "shell");
assert_eq!(
runtime.workspace(),
&PathBuf::from("/home/anda-native-runtime-tests")
);
}
#[tokio::test(flavor = "current_thread")]
async fn execute_runs_foreground_command_with_envs_and_workspace() {
let ctx = EngineBuilder::new().mock_ctx();
let workspace = TestTempDir::new("anda-native-foreground").await;
let nested_dir = workspace.create_dir("nested").await;
let runtime = NativeRuntime::new(nested_dir.clone());
let env_name = "ANDA_NATIVE_TEST_VALUE";
let output_file = "env.txt";
let mut envs = HashMap::new();
envs.insert(env_name.to_string(), "secret-value".to_string());
let output = runtime
.execute(
ctx.base,
ExecArgs {
command: foreground_command(&runtime, env_name, output_file),
..Default::default()
},
envs,
)
.await
.unwrap();
let written = tokio::fs::read_to_string(nested_dir.join(output_file))
.await
.unwrap();
assert_eq!(written.trim(), "secret-value");
assert!(output.process_id.is_some());
assert!(output.raw_output_path.is_none());
assert_eq!(output.stdout.as_deref().map(str::trim), Some("done"));
assert_eq!(output.stderr.as_deref().map(str::trim), Some("warn"));
}
#[tokio::test(flavor = "current_thread")]
async fn execute_reports_background_output_via_hook() {
let ctx = EngineBuilder::new().mock_ctx();
let workspace = TestTempDir::new("anda-native-background").await;
let (sender, receiver) = oneshot::channel();
let hook = ShellToolHook::new(Arc::new(TestHook::new(sender)));
ctx.base.set_state(hook);
let runtime = NativeRuntime::new(workspace.path().to_path_buf());
let input = ExecArgs {
command: background_command(&runtime),
background: true,
..Default::default()
};
let output = runtime
.execute(ctx.base, input.clone(), HashMap::new())
.await
.unwrap();
assert!(output.process_id.is_some());
assert!(output.exit_status.is_some());
assert!(output.stdout.is_some());
assert!(output.stderr.is_none());
let (
task_id,
ToolOutput {
output: hook_output,
..
},
) = tokio::time::timeout(Duration::from_secs(5), receiver)
.await
.unwrap()
.unwrap();
assert!(task_id.contains("shell"));
assert_eq!(hook_output.process_id, output.process_id);
assert_eq!(hook_output.stdout.as_deref().map(str::trim), Some("bg-out"));
assert_eq!(hook_output.stderr.as_deref().map(str::trim), Some("bg-err"));
}
#[tokio::test(flavor = "current_thread")]
async fn execute_reports_background_progress_via_hook() {
let ctx = EngineBuilder::new().mock_ctx();
let workspace = TestTempDir::new("anda-native-progress").await;
let (progress_sender, mut progress_receiver) = mpsc::unbounded_channel();
let (end_sender, end_receiver) = oneshot::channel();
let hook = ShellToolHook::new(Arc::new(ProgressHook::new(progress_sender, end_sender)));
ctx.base.set_state(hook);
let runtime = NativeRuntime::new(workspace.path().to_path_buf());
let input = ExecArgs {
command: background_progress_command(&runtime),
background: true,
..Default::default()
};
let output = runtime
.execute(ctx.base, input.clone(), HashMap::new())
.await
.unwrap();
assert!(output.process_id.is_some());
assert!(output.exit_status.is_some());
assert!(output.stdout.is_some());
assert!(output.stderr.is_none());
let progress_task_id = tokio::time::timeout(Duration::from_secs(5), async {
let mut saw_stdout = false;
let mut saw_stderr = false;
loop {
let (
task_id,
ToolOutput {
output: progress_output,
..
},
) = progress_receiver.recv().await.unwrap();
assert_eq!(progress_output.process_id, output.process_id);
assert!(progress_output.exit_status.is_none());
if progress_output
.stdout
.as_deref()
.is_some_and(|stdout| stdout.contains("progress-out"))
{
saw_stdout = true;
}
if progress_output
.stderr
.as_deref()
.is_some_and(|stderr| stderr.contains("progress-err"))
{
saw_stderr = true;
}
if saw_stdout && saw_stderr {
break task_id;
}
}
})
.await
.unwrap();
let (
end_task_id,
ToolOutput {
output: hook_output,
..
},
) = tokio::time::timeout(Duration::from_secs(5), end_receiver)
.await
.unwrap()
.unwrap();
assert!(progress_task_id.contains("shell"));
assert_eq!(end_task_id, progress_task_id);
assert_eq!(hook_output.process_id, output.process_id);
assert!(
hook_output
.stdout
.as_deref()
.is_some_and(|stdout| stdout.contains("progress-out") && stdout.contains("done"))
);
assert!(
hook_output
.stderr
.as_deref()
.is_some_and(|stderr| stderr.contains("progress-err"))
);
}
}