pub mod ansi;
pub mod events;
#[doc(hidden)]
pub mod macros;
pub mod pipeline;
pub mod quote;
pub mod state;
pub mod stream;
pub mod trace;
pub mod commands;
pub mod shell_parser;
pub mod utils;
use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::mpsc;
pub use commands::{CommandContext, StreamChunk};
pub use shell_parser::{needs_real_shell, parse_shell_command, ParsedCommand};
pub use utils::{CommandResult, VirtualUtils};
pub use ansi::{AnsiConfig, AnsiUtils};
pub use events::{EventData, EventType, StreamEmitter};
pub use pipeline::{Pipeline, PipelineBuilder, PipelineExt};
pub use quote::quote;
pub use state::{
get_shell_settings, global_state, reset_global_state, set_shell_option, unset_shell_option,
GlobalState, ShellSettings,
};
pub use stream::{AsyncIterator, IntoStream, OutputChunk, OutputStream, StreamingRunner};
pub use trace::trace;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Command failed with exit code {code}: {message}")]
CommandFailed { code: i32, message: String },
#[error("Command not found: {0}")]
CommandNotFound(String),
#[error("Parse error: {0}")]
ParseError(String),
#[error("Cancelled")]
Cancelled,
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Clone)]
pub struct RunOptions {
pub mirror: bool,
pub capture: bool,
pub stdin: StdinOption,
pub cwd: Option<PathBuf>,
pub env: Option<HashMap<String, String>>,
pub interactive: bool,
pub shell_operators: bool,
pub trace: bool,
}
impl Default for RunOptions {
fn default() -> Self {
RunOptions {
mirror: true,
capture: true,
stdin: StdinOption::Inherit,
cwd: None,
env: None,
interactive: false,
shell_operators: true,
trace: true,
}
}
}
#[derive(Debug, Clone)]
pub enum StdinOption {
Inherit,
Pipe,
Content(String),
Null,
}
pub struct ProcessRunner {
command: String,
options: RunOptions,
child: Option<Child>,
result: Option<CommandResult>,
started: bool,
finished: bool,
cancelled: bool,
output_tx: Option<mpsc::Sender<StreamChunk>>,
output_rx: Option<mpsc::Receiver<StreamChunk>>,
}
impl ProcessRunner {
pub fn new(command: impl Into<String>, options: RunOptions) -> Self {
let (tx, rx) = mpsc::channel(1024);
ProcessRunner {
command: command.into(),
options,
child: None,
result: None,
started: false,
finished: false,
cancelled: false,
output_tx: Some(tx),
output_rx: Some(rx),
}
}
pub async fn start(&mut self) -> Result<()> {
if self.started {
return Ok(());
}
self.started = true;
utils::trace_lazy("ProcessRunner", || {
format!("Starting command: {}", self.command)
});
let first_word = self.command.split_whitespace().next().unwrap_or("");
if let Some(result) = self.try_virtual_command(first_word).await {
self.result = Some(result);
self.finished = true;
return Ok(());
}
let _parsed = if self.options.shell_operators && !needs_real_shell(&self.command) {
parse_shell_command(&self.command)
} else {
None
};
let shell = find_available_shell();
let mut cmd = Command::new(&shell.cmd);
for arg in &shell.args {
cmd.arg(arg);
}
cmd.arg(&self.command);
match &self.options.stdin {
StdinOption::Inherit => {
cmd.stdin(Stdio::inherit());
}
StdinOption::Pipe => {
cmd.stdin(Stdio::piped());
}
StdinOption::Content(_) => {
cmd.stdin(Stdio::piped());
}
StdinOption::Null => {
cmd.stdin(Stdio::null());
}
}
if self.options.capture || self.options.mirror {
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
} else {
cmd.stdout(Stdio::inherit());
cmd.stderr(Stdio::inherit());
}
if let Some(ref cwd) = self.options.cwd {
cmd.current_dir(cwd);
}
if let Some(ref env_vars) = self.options.env {
for (key, value) in env_vars {
cmd.env(key, value);
}
}
let child = cmd.spawn()?;
self.child = Some(child);
Ok(())
}
pub async fn run(&mut self) -> Result<CommandResult> {
self.start().await?;
if let Some(result) = &self.result {
return Ok(result.clone());
}
let mut child = self.child.take().ok_or_else(|| {
Error::Io(std::io::Error::new(
std::io::ErrorKind::Other,
"Process not started",
))
})?;
if let StdinOption::Content(ref content) = self.options.stdin {
if let Some(mut stdin) = child.stdin.take() {
let content = content.clone();
tokio::spawn(async move {
let _ = stdin.write_all(content.as_bytes()).await;
let _ = stdin.shutdown().await;
});
}
}
let mut stdout_content = String::new();
let mut stderr_content = String::new();
if let Some(stdout) = child.stdout.take() {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
if self.options.mirror {
println!("{}", line);
}
stdout_content.push_str(&line);
stdout_content.push('\n');
}
}
if let Some(stderr) = child.stderr.take() {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
if self.options.mirror {
eprintln!("{}", line);
}
stderr_content.push_str(&line);
stderr_content.push('\n');
}
}
let status = child.wait().await?;
let code = status.code().unwrap_or(-1);
let result = CommandResult {
stdout: stdout_content,
stderr: stderr_content,
code,
};
self.result = Some(result.clone());
self.finished = true;
Ok(result)
}
async fn try_virtual_command(&self, cmd_name: &str) -> Option<CommandResult> {
if !commands::are_virtual_commands_enabled() {
return None;
}
let parts: Vec<&str> = self.command.split_whitespace().collect();
let args: Vec<String> = parts.iter().skip(1).map(|s| s.to_string()).collect();
let ctx = CommandContext {
args,
stdin: match &self.options.stdin {
StdinOption::Content(s) => Some(s.clone()),
_ => None,
},
cwd: self.options.cwd.clone(),
env: self.options.env.clone(),
output_tx: self.output_tx.clone(),
is_cancelled: None,
};
match cmd_name {
"echo" => Some(commands::echo(ctx).await),
"pwd" => Some(commands::pwd(ctx).await),
"cd" => Some(commands::cd(ctx).await),
"true" => Some(commands::r#true(ctx).await),
"false" => Some(commands::r#false(ctx).await),
"sleep" => Some(commands::sleep(ctx).await),
"cat" => Some(commands::cat(ctx).await),
"ls" => Some(commands::ls(ctx).await),
"mkdir" => Some(commands::mkdir(ctx).await),
"rm" => Some(commands::rm(ctx).await),
"touch" => Some(commands::touch(ctx).await),
"cp" => Some(commands::cp(ctx).await),
"mv" => Some(commands::mv(ctx).await),
"basename" => Some(commands::basename(ctx).await),
"dirname" => Some(commands::dirname(ctx).await),
"env" => Some(commands::env(ctx).await),
"exit" => Some(commands::exit(ctx).await),
"which" => Some(commands::which(ctx).await),
"yes" => Some(commands::yes(ctx).await),
"seq" => Some(commands::seq(ctx).await),
"test" => Some(commands::test(ctx).await),
_ => None,
}
}
pub fn kill(&mut self) -> Result<()> {
self.cancelled = true;
if let Some(ref mut child) = self.child {
child.start_kill()?;
}
Ok(())
}
pub fn is_finished(&self) -> bool {
self.finished
}
pub fn result(&self) -> Option<&CommandResult> {
self.result.as_ref()
}
pub fn command(&self) -> &str {
&self.command
}
pub fn options(&self) -> &RunOptions {
&self.options
}
}
#[derive(Debug, Clone)]
struct ShellConfig {
cmd: String,
args: Vec<String>,
}
fn find_available_shell() -> ShellConfig {
let is_windows = cfg!(windows);
if is_windows {
let shells = [
("cmd.exe", vec!["/c"]),
("powershell.exe", vec!["-Command"]),
];
for (cmd, args) in shells {
if which::which(cmd).is_ok() {
return ShellConfig {
cmd: cmd.to_string(),
args: args.into_iter().map(String::from).collect(),
};
}
}
ShellConfig {
cmd: "cmd.exe".to_string(),
args: vec!["/c".to_string()],
}
} else {
let shells = [
("/bin/sh", vec!["-c"]),
("/usr/bin/sh", vec!["-c"]),
("/bin/bash", vec!["-c"]),
("sh", vec!["-c"]),
];
for (cmd, args) in shells {
if std::path::Path::new(cmd).exists() || which::which(cmd).is_ok() {
return ShellConfig {
cmd: cmd.to_string(),
args: args.into_iter().map(String::from).collect(),
};
}
}
ShellConfig {
cmd: "/bin/sh".to_string(),
args: vec!["-c".to_string()],
}
}
}
pub async fn run(command: impl Into<String>) -> Result<CommandResult> {
let mut runner = ProcessRunner::new(command, RunOptions::default());
runner.run().await
}
pub use run as execute;
pub async fn exec(command: impl Into<String>, options: RunOptions) -> Result<CommandResult> {
let mut runner = ProcessRunner::new(command, options);
runner.run().await
}
pub fn create(command: impl Into<String>, options: RunOptions) -> ProcessRunner {
ProcessRunner::new(command, options)
}
pub fn run_sync(command: impl Into<String>) -> Result<CommandResult> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(run(command))
}