use super::command::{
duplicate_shell_fd, run_command, run_command_with_stdio, run_planned_simple_command_with_stdio,
};
use super::simple_command::{plan_simple_command, run_planned_simple_command};
use super::*;
use std::collections::HashSet;
#[cfg(any(
feature = "frontend",
all(test, feature = "test-support", feature = "unix-runtime")
))]
use std::sync::Arc;
use std::thread::{self, ScopedJoinHandle};
#[cfg(any(
feature = "frontend",
all(test, feature = "test-support", feature = "unix-runtime")
))]
pub(crate) fn run_machine_payload<R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
payload_text: &str,
) -> i32 {
let payload = match super::machine::parse_machine_payload(payload_text) {
Ok(payload) => payload,
Err(err) => {
shell_errln(state, &state.prefixed_message(err));
return 2;
}
};
let definition = Arc::new(payload.definition.into_shell_definition());
let stdin_fd = state.stdin_fd;
let stdout_fd = state.stdout_fd;
let stderr_fd = state.stderr_fd;
let outcome_captures = std::mem::take(&mut state.outcome_captures);
*state = ShellState::from_background_checkpoint(
definition,
payload.state,
stdin_fd,
stdout_fd,
stderr_fd,
outcome_captures,
);
let _signal_forwarding = super::machine::BackgroundSignalForwardGuard::install();
let previous_command_id = state.active_command_id.clone();
state.set_active_command_id(Some(payload.command_id.clone()));
sync_monitor_mode(state);
super::traps::install_restored_traps(state);
let canonical = super::events::canonical_command_list_text(&payload.and_or_list, false);
super::events::emit_program_start(state, &payload.command_id, None, &canonical);
let built = build_runtime_and_or_list(state, runtime, &payload.and_or_list);
let status = run_exec_and_or_list(state, runtime, &built);
super::traps::run_pending_traps(state, runtime);
super::events::emit_program_finish(state, &payload.command_id, status);
state.set_active_command_id(previous_command_id);
status
}
fn is_syntactically_pure_external_word(word: &Word) -> bool {
matches!(word, Word::String(_))
}
fn is_syntactically_pure_external_stage(sc: &SimpleCommand) -> bool {
sc.assignments.is_empty()
&& sc.io_redirects.is_empty()
&& sc
.name
.as_ref()
.is_some_and(is_syntactically_pure_external_word)
&& sc.arguments.iter().all(is_syntactically_pure_external_word)
}
trait PlanDeferredPipelineStageWork<'ast> {
fn deferred_pipeline_stage_work(
&'ast self,
state: &ShellState,
) -> Option<DeferredPipelineStageWork<'ast>>;
}
impl<'ast> PlanDeferredPipelineStageWork<'ast> for AstCommand {
fn deferred_pipeline_stage_work(
&'ast self,
state: &ShellState,
) -> Option<DeferredPipelineStageWork<'ast>> {
match self {
AstCommand::Simple(simple) => simple.deferred_pipeline_stage_work(state),
_ => Some(DeferredPipelineStageWork::CompoundCommand),
}
}
}
impl<'ast> PlanDeferredPipelineStageWork<'ast> for SimpleCommand {
fn deferred_pipeline_stage_work(
&'ast self,
state: &ShellState,
) -> Option<DeferredPipelineStageWork<'ast>> {
self.name
.as_ref()
.and_then(|word| word.deferred_pipeline_stage_work(state))
.or_else(|| {
self.arguments
.iter()
.find_map(|word| word.deferred_pipeline_stage_work(state))
})
}
}
impl<'ast> PlanDeferredPipelineStageWork<'ast> for Word {
fn deferred_pipeline_stage_work(
&'ast self,
state: &ShellState,
) -> Option<DeferredPipelineStageWork<'ast>> {
match self {
Word::String(_) => None,
Word::Command(_) => Some(DeferredPipelineStageWork::Expansion(
DeferredExpansion::CommandSubstitution { word: self },
)),
Word::Arithmetic(arithm) => arithmetic_requires_deferred_pipeline_execution(
arithm.body(),
)
.then_some(DeferredPipelineStageWork::Expansion(
DeferredExpansion::ArithmeticExpansion { word: self },
)),
Word::Parameter(parameter) => deferred_parameter_pipeline_stage_work(
self,
state,
parameter.name(),
parameter.op(),
parameter.colon(),
parameter.arg(),
),
Word::List(list) => list
.children()
.iter()
.find_map(|word| word.deferred_pipeline_stage_work(state)),
}
}
}
fn arithmetic_requires_deferred_pipeline_execution(expr: &ArithmExpr) -> bool {
match expr {
ArithmExpr::Literal(_) | ArithmExpr::Variable(_) => false,
ArithmExpr::Raw(_) | ArithmExpr::Assign(_) => true,
ArithmExpr::BinOp(binary) => {
arithmetic_requires_deferred_pipeline_execution(binary.left())
|| arithmetic_requires_deferred_pipeline_execution(binary.right())
}
ArithmExpr::UnOp(unary) => arithmetic_requires_deferred_pipeline_execution(unary.operand()),
ArithmExpr::Cond(cond) => {
arithmetic_requires_deferred_pipeline_execution(cond.cond())
|| arithmetic_requires_deferred_pipeline_execution(cond.then_branch())
|| arithmetic_requires_deferred_pipeline_execution(cond.else_branch())
}
}
}
fn deferred_parameter_pipeline_stage_work<'ast>(
word: &'ast Word,
state: &ShellState,
name: &str,
op: ParameterOp,
colon: bool,
arg: Option<&'ast Word>,
) -> Option<DeferredPipelineStageWork<'ast>> {
let value = shell_expand::get_parameter_value(state, name);
let is_set = value.is_some();
let is_nonempty = value.as_ref().is_some_and(|v| !v.is_empty());
let treat_as_unset = if colon { !is_nonempty } else { !is_set };
match op {
ParameterOp::None => {
if state.has_option(OPT_NOUNSET)
&& !shell_expand::is_nounset_exempt_parameter_name(name)
&& value.is_none()
{
Some(DeferredPipelineStageWork::Expansion(
DeferredExpansion::ParameterExpansion { word, op },
))
} else {
None
}
}
ParameterOp::LeadingHash => None,
ParameterOp::Minus => {
if treat_as_unset {
arg.and_then(|word| word.deferred_pipeline_stage_work(state))
} else {
None
}
}
ParameterOp::Equal | ParameterOp::QMark => {
if treat_as_unset {
Some(DeferredPipelineStageWork::Expansion(
DeferredExpansion::ParameterExpansion { word, op },
))
} else {
None
}
}
ParameterOp::Plus => {
if treat_as_unset {
None
} else {
arg.and_then(|word| word.deferred_pipeline_stage_work(state))
}
}
ParameterOp::Percent
| ParameterOp::DoublePercent
| ParameterOp::Hash
| ParameterOp::DoubleHash => arg.and_then(|word| word.deferred_pipeline_stage_work(state)),
}
}
fn prepared_external_stage(
state: &ShellState,
plan: &PlannedSimpleCommand<'_>,
) -> Option<PreparedExternalStagePlan> {
let PlannedSimpleCommandKind::External { program, argv } = plan.kind() else {
return None;
};
Some(PreparedExternalStagePlan {
program: program.clone(),
argv: argv.clone(),
env: shell_resolve::exported_exec_environment(state),
cwd: state.path_state.cwd.clone(),
})
}
#[derive(Default)]
struct WorkIdAllocator {
next: usize,
}
impl WorkIdAllocator {
fn next_work_id(&mut self) -> String {
self.next += 1;
format!("work-{}", self.next)
}
}
#[derive(Clone, Copy)]
struct PipelineStageBuildContext<'a> {
multi_stage: bool,
preserved_work_id: Option<&'a str>,
}
impl PipelineStageBuildContext<'_> {
fn next_work_id(self, work_ids: &mut WorkIdAllocator) -> String {
self.preserved_work_id
.map(ToString::to_string)
.unwrap_or_else(|| work_ids.next_work_id())
}
}
enum LazyAst<'ast> {
AndOr(&'ast AndOrList),
BorrowedCommand(&'ast AstCommand),
OwnedCommand(Box<AstCommand>),
}
struct LazyWork<'ast> {
work_id: String,
work: DeferredPipelineStageWork<'ast>,
}
pub(super) struct LazyNode<'ast> {
ast: LazyAst<'ast>,
reason: DeferredReason,
work: Option<LazyWork<'ast>>,
}
impl<'ast> LazyNode<'ast> {
fn from_planned(node: &ExecLazyNode<'ast>) -> Self {
let ast = match node.ast() {
ExecLazyAst::AndOr(and_or) => LazyAst::AndOr(and_or),
ExecLazyAst::Command(command) => LazyAst::BorrowedCommand(command),
};
let work = match (node.work_id(), node.deferred_work()) {
(Some(work_id), Some(work)) => Some(LazyWork {
work_id: work_id.to_string(),
work,
}),
(None, None) => None,
(work_id, work) => panic!(
"planned lazy node must provide both work_id and deferred_work or neither, got work_id={work_id:?} work={work:?}"
),
};
Self {
ast,
reason: node.reason(),
work,
}
}
fn command(&self) -> Option<&AstCommand> {
match &self.ast {
LazyAst::AndOr(_) => None,
LazyAst::BorrowedCommand(command) => Some(command),
LazyAst::OwnedCommand(command) => Some(command.as_ref()),
}
}
fn and_or(&self) -> Option<&'ast AndOrList> {
match self.ast {
LazyAst::AndOr(and_or) => Some(and_or),
LazyAst::BorrowedCommand(_) | LazyAst::OwnedCommand(_) => None,
}
}
pub(super) fn execute_command<R: Runtime>(
&self,
state: &mut ShellState,
runtime: &mut R,
) -> i32 {
let command = self
.command()
.expect("lazy command execution requires command AST");
let _ = self.reason;
match &self.work {
Some(work) => {
let work_kind = step_kind_for_work(work.work);
let work_detail = step_detail_for_work(command, work.work);
let work_summary = step_summary_for_detail(&work_detail);
shell_events::emit_deferred_work_start(
state,
&work.work_id,
work_kind,
&work_summary,
&work_detail,
);
let status = run_command(state, runtime, command).status;
shell_events::emit_deferred_work_finish(
state,
&work.work_id,
work_kind,
&work_summary,
&work_detail,
status,
);
status
}
None => run_command(state, runtime, command).status,
}
}
pub(super) fn execute_command_with_stdio<R: Runtime>(
&self,
state: &mut ShellState,
runtime: &mut R,
stdin_fd: sys::FileDescriptor,
stdout_fd: sys::FileDescriptor,
stderr_fd: sys::FileDescriptor,
) -> i32 {
let command = self
.command()
.expect("lazy command execution requires command AST");
let _ = self.reason;
match &self.work {
Some(work) => {
let work_kind = step_kind_for_work(work.work);
let work_detail = step_detail_for_work(command, work.work);
let work_summary = step_summary_for_detail(&work_detail);
shell_events::emit_deferred_work_start(
state,
&work.work_id,
work_kind,
&work_summary,
&work_detail,
);
let status =
run_command_with_stdio(state, runtime, command, stdin_fd, stdout_fd, stderr_fd);
shell_events::emit_deferred_work_finish(
state,
&work.work_id,
work_kind,
&work_summary,
&work_detail,
status,
);
status
}
None => run_command_with_stdio(state, runtime, command, stdin_fd, stdout_fd, stderr_fd),
}
}
fn realize_with_shell_state<R: Runtime>(
&self,
state: &mut ShellState,
runtime: &mut R,
) -> ExecAndOrList<'ast> {
let and_or = self
.and_or()
.expect("lazy and-or realization requires and-or AST");
let _ = self.reason;
build_runtime_and_or_list(state, runtime, and_or)
}
}
impl LazyNode<'static> {
pub(super) fn owned_command(command: AstCommand, reason: DeferredReason) -> Self {
Self {
ast: LazyAst::OwnedCommand(Box::new(command)),
reason,
work: None,
}
}
}
trait LazyCommand<'ast>: PlanDeferredPipelineStageWork<'ast> {
fn realize_pipeline_stage<R: Runtime>(
&'ast self,
state: &mut ShellState,
runtime: &mut R,
build: PipelineStageBuildContext<'_>,
work_ids: &mut WorkIdAllocator,
) -> ExecPipelineStage<'ast>;
}
impl<'ast> LazyCommand<'ast> for AstCommand {
fn realize_pipeline_stage<R: Runtime>(
&'ast self,
state: &mut ShellState,
runtime: &mut R,
build: PipelineStageBuildContext<'_>,
work_ids: &mut WorkIdAllocator,
) -> ExecPipelineStage<'ast> {
let mut next_work_id = || build.next_work_id(work_ids);
if build.multi_stage
&& let Some(work) = self.deferred_pipeline_stage_work(state)
{
return ExecPipelineStage::Lazy(ExecLazyNode {
ast: ExecLazyAst::Command(self),
reason: DeferredReason::NeedsCurrentShellState,
work_id: Some(next_work_id()),
work: Some(work),
});
}
let AstCommand::Simple(simple) = self else {
return ExecPipelineStage::Lazy(ExecLazyNode {
ast: ExecLazyAst::Command(self),
reason: DeferredReason::NeedsCurrentShellState,
work_id: Some(next_work_id()),
work: Some(DeferredPipelineStageWork::CompoundCommand),
});
};
let plan = match plan_simple_command(state, runtime, simple) {
Ok(plan) => plan,
Err(status) => return ExecPipelineStage::Failure { status },
};
if build.multi_stage
&& is_syntactically_pure_external_stage(simple)
&& let Some(stage) = prepared_external_stage(state, &plan)
{
return ExecPipelineStage::PreparedExternal(stage);
}
ExecPipelineStage::SimpleCommand(plan)
}
}
fn step_kind_for_work(work: DeferredPipelineStageWork<'_>) -> DeferredWorkKind {
match work {
DeferredPipelineStageWork::CommandDispatch => DeferredWorkKind::CommandDispatch,
DeferredPipelineStageWork::CompoundCommand => DeferredWorkKind::CompoundCommand,
DeferredPipelineStageWork::Expansion(DeferredExpansion::CommandSubstitution { .. }) => {
DeferredWorkKind::CommandSubstitution
}
DeferredPipelineStageWork::Expansion(DeferredExpansion::ParameterExpansion { .. }) => {
DeferredWorkKind::ParameterExpansion
}
DeferredPipelineStageWork::Expansion(DeferredExpansion::ArithmeticExpansion { .. }) => {
DeferredWorkKind::ArithmeticExpansion
}
}
}
fn step_detail_for_work(
command: &AstCommand,
work: DeferredPipelineStageWork<'_>,
) -> DeferredWorkDetail {
match work {
DeferredPipelineStageWork::CommandDispatch => DeferredWorkDetail::CommandDispatch {
command: command.clone(),
},
DeferredPipelineStageWork::CompoundCommand => DeferredWorkDetail::CompoundCommand {
command: command.clone(),
},
DeferredPipelineStageWork::Expansion(DeferredExpansion::CommandSubstitution { word }) => {
DeferredWorkDetail::CommandSubstitution { word: word.clone() }
}
DeferredPipelineStageWork::Expansion(DeferredExpansion::ParameterExpansion {
word,
op,
}) => DeferredWorkDetail::ParameterExpansion {
word: word.clone(),
op,
},
DeferredPipelineStageWork::Expansion(DeferredExpansion::ArithmeticExpansion { word }) => {
DeferredWorkDetail::ArithmeticExpansion { word: word.clone() }
}
}
}
fn step_summary_for_detail(detail: &DeferredWorkDetail) -> String {
format!("{detail:?}")
}
fn build_static_pipeline_stage<'ast>(
command: &'ast AstCommand,
work_ids: &mut WorkIdAllocator,
) -> ExecPipelineStage<'ast> {
ExecPipelineStage::Lazy(ExecLazyNode {
ast: ExecLazyAst::Command(command),
reason: DeferredReason::NeedsCurrentShellState,
work_id: Some(work_ids.next_work_id()),
work: Some(DeferredPipelineStageWork::CommandDispatch),
})
}
fn build_static_pipeline<'ast>(
pipeline: &'ast Pipeline,
work_ids: &mut WorkIdAllocator,
) -> ShellExecPipeline<'ast> {
ShellExecPipeline {
bang: pipeline.bang,
stages: pipeline
.commands
.iter()
.map(|command| build_static_pipeline_stage(command, work_ids))
.collect(),
}
}
fn build_static_and_or<'ast>(
and_or: &'ast AndOrList,
work_ids: &mut WorkIdAllocator,
) -> ExecAndOrList<'ast> {
match and_or {
AndOrList::Pipeline(pipeline) => {
ExecAndOrList::Pipeline(build_static_pipeline(pipeline, work_ids))
}
AndOrList::BinOp(binary) => ExecAndOrList::BinOp {
op: binary.op(),
left: Box::new(build_static_and_or(binary.left(), work_ids)),
right: ExecLazyNode {
ast: ExecLazyAst::AndOr(binary.right()),
reason: DeferredReason::ShortCircuitRhs,
work_id: None,
work: None,
},
},
}
}
fn build_static_command_list<'ast>(
command_list: &'ast CommandList,
work_ids: &mut WorkIdAllocator,
) -> ExecCommandList<'ast> {
ExecCommandList {
raw_and_or_list: &command_list.and_or_list,
and_or_list: build_static_and_or(&command_list.and_or_list, work_ids),
ampersand: command_list.ampersand,
}
}
pub(crate) fn build_program_execution<'ast, R: Runtime>(
_state: &mut ShellState,
_runtime: &mut R,
program: &'ast Program,
) -> ShellExecProgram<'ast> {
let mut work_ids = WorkIdAllocator::default();
ShellExecProgram {
body: program
.body
.iter()
.map(|command_list| build_static_command_list(command_list, &mut work_ids))
.collect(),
}
}
fn build_runtime_pipeline_stage<'ast, R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
command: &'ast AstCommand,
multi_stage: bool,
work_ids: &mut WorkIdAllocator,
) -> ExecPipelineStage<'ast> {
command.realize_pipeline_stage(
state,
runtime,
PipelineStageBuildContext {
multi_stage,
preserved_work_id: None,
},
work_ids,
)
}
fn build_runtime_pipeline_stage_with_preserved_work_id<'ast, R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
command: &'ast AstCommand,
multi_stage: bool,
work_id: Option<&str>,
work_ids: &mut WorkIdAllocator,
) -> ExecPipelineStage<'ast> {
command.realize_pipeline_stage(
state,
runtime,
PipelineStageBuildContext {
multi_stage,
preserved_work_id: work_id,
},
work_ids,
)
}
fn realize_runtime_pipeline<'ast, R>(
state: &mut ShellState,
runtime: &mut R,
pipeline: &ShellExecPipeline<'ast>,
) -> ShellExecPipeline<'ast>
where
R: Runtime,
{
let multi_stage = pipeline.stages().len() > 1;
let pipeline_last_status = state.last_status;
let mut work_ids = WorkIdAllocator::default();
ShellExecPipeline {
bang: pipeline.bang(),
stages: pipeline
.stages()
.iter()
.map(|stage| {
state.set_last_status(pipeline_last_status);
let realized = match stage {
ExecPipelineStage::PreparedExternal(stage) => {
ExecPipelineStage::PreparedExternal(stage.clone())
}
ExecPipelineStage::Failure { status } => {
ExecPipelineStage::Failure { status: *status }
}
ExecPipelineStage::SimpleCommand(plan) => {
ExecPipelineStage::SimpleCommand(plan.clone())
}
ExecPipelineStage::Lazy(lazy)
if matches!(
lazy.deferred_work(),
Some(DeferredPipelineStageWork::CommandDispatch)
) =>
{
let command = lazy
.command()
.expect("command-dispatch lazy stage should carry a command");
let work_id = lazy
.work_id()
.expect("command-dispatch lazy stage should carry a work id");
build_runtime_pipeline_stage_with_preserved_work_id(
state,
runtime,
command,
multi_stage,
Some(work_id),
&mut work_ids,
)
}
ExecPipelineStage::Lazy(lazy) => ExecPipelineStage::Lazy(lazy.clone()),
};
state.set_last_status(pipeline_last_status);
realized
})
.collect(),
}
}
pub(super) fn realize_runtime_and_or_list<'ast, R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
and_or: &ExecAndOrList<'ast>,
) -> ExecAndOrList<'ast> {
match and_or {
ExecAndOrList::Pipeline(pipeline) => {
ExecAndOrList::Pipeline(realize_runtime_pipeline(state, runtime, pipeline))
}
ExecAndOrList::BinOp { op, left, right } => ExecAndOrList::BinOp {
op: *op,
left: Box::new(realize_runtime_and_or_list(state, runtime, left)),
right: right.clone(),
},
}
}
fn build_runtime_pipeline<'ast, R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
pipeline: &'ast Pipeline,
work_ids: &mut WorkIdAllocator,
) -> ShellExecPipeline<'ast> {
let multi_stage = pipeline.commands.len() > 1;
let pipeline_last_status = state.last_status;
ShellExecPipeline {
bang: pipeline.bang,
stages: pipeline
.commands
.iter()
.map(|command| {
state.set_last_status(pipeline_last_status);
let stage =
build_runtime_pipeline_stage(state, runtime, command, multi_stage, work_ids);
state.set_last_status(pipeline_last_status);
stage
})
.collect(),
}
}
pub(crate) fn build_pipeline_execution<'ast, R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
pipeline: &'ast Pipeline,
) -> ShellExecPipeline<'ast> {
let mut work_ids = WorkIdAllocator::default();
build_runtime_pipeline(state, runtime, pipeline, &mut work_ids)
}
fn build_runtime_and_or_list_with_work_ids<'ast, R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
and_or: &'ast AndOrList,
work_ids: &mut WorkIdAllocator,
) -> ExecAndOrList<'ast> {
match and_or {
AndOrList::Pipeline(pipeline) => {
ExecAndOrList::Pipeline(build_runtime_pipeline(state, runtime, pipeline, work_ids))
}
AndOrList::BinOp(binary) => ExecAndOrList::BinOp {
op: binary.op(),
left: Box::new(build_runtime_and_or_list_with_work_ids(
state,
runtime,
binary.left(),
work_ids,
)),
right: ExecLazyNode {
ast: ExecLazyAst::AndOr(binary.right()),
reason: DeferredReason::ShortCircuitRhs,
work_id: None,
work: None,
},
},
}
}
pub(super) fn build_runtime_and_or_list<'ast, R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
and_or: &'ast AndOrList,
) -> ExecAndOrList<'ast> {
let mut work_ids = WorkIdAllocator::default();
build_runtime_and_or_list_with_work_ids(state, runtime, and_or, &mut work_ids)
}
fn with_errexit_context_if<T>(
state: &mut ShellState,
enabled: bool,
context: ErrexitContext,
run: impl FnOnce(&mut ShellState) -> T,
) -> T {
if !enabled {
return run(state);
}
state.push_errexit_context(context);
let result = run(state);
state.pop_errexit_context();
result
}
pub(super) fn run_exec_pipeline<R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
pipeline: &ShellExecPipeline<'_>,
) -> i32 {
let status = with_errexit_context_if(
state,
pipeline.bang(),
ErrexitContext::InvertedPipeline,
|state| run_exec_pipeline_body(state, runtime, pipeline),
);
finalize_pipeline_status(state, pipeline.bang(), status)
}
fn run_exec_pipeline_body<R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
pipeline: &ShellExecPipeline<'_>,
) -> i32 {
if pipeline.stages().len() == 1 {
return run_single_stage_pipeline(state, runtime, &pipeline.stages()[0]);
}
let mut prepared = match prepare_multi_stage_pipeline(pipeline.stages().len()) {
Ok(prepared) => prepared,
Err(status) => return status,
};
spawn_external_pipeline_stages(state, runtime, pipeline, &mut prepared);
let foreground_guard = if shell_job_control_active(state)
&& state.stdin_fd.is_valid()
&& pipeline
.stages()
.iter()
.all(|stage| matches!(stage, ExecPipelineStage::PreparedExternal(_)))
{
prepared
.foreground_group
.and_then(|process| runtime.claim_foreground(process, state.stdin_fd).ok())
} else {
None
};
execute_shell_pipeline_stages(state, runtime, pipeline, &mut prepared);
let status = collect_pipeline_statuses(state, runtime, &mut prepared);
if let Some(guard) = foreground_guard {
let _ = runtime.release_foreground(guard);
}
status
}
fn finalize_pipeline_status(state: &mut ShellState, bang: bool, status: i32) -> i32 {
if state.control_flow != ControlFlow::None {
return status;
}
if bang {
state.mark_next_status_errexit_exempt(ErrexitContext::InvertedPipeline);
if status != 0 && state.exit_code >= 0 {
status
} else {
i32::from(status == 0)
}
} else {
status
}
}
fn run_prepared_external_stage<R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
stage: &PreparedExternalStagePlan,
) -> i32 {
let process_group = if shell_job_control_active(state) {
ProcessGroupPlan::New
} else {
ProcessGroupPlan::Inherit
};
let launch = ChildLaunchPlan::new(
state,
stage.program().to_string(),
stage.argv().to_vec(),
process_group,
)
.with_env(stage.env().to_vec())
.with_cwd(stage.cwd().to_path_buf());
match launch.spawn(runtime, sys::SpawnMode::Foreground) {
Ok(child) => launch.wait_foreground(state, runtime, child),
Err(err) => sys::spawn_error_exit_status(&err),
}
}
fn run_single_stage_pipeline<R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
stage: &ExecPipelineStage<'_>,
) -> i32 {
match stage {
ExecPipelineStage::PreparedExternal(stage) => {
run_prepared_external_stage(state, runtime, stage)
}
ExecPipelineStage::Failure { status } => *status,
ExecPipelineStage::SimpleCommand(plan) => {
run_planned_simple_command(state, runtime, plan).status
}
ExecPipelineStage::Lazy(lazy) => {
LazyNode::from_planned(lazy).execute_command(state, runtime)
}
}
}
struct PreparedPipelineExecution {
pipes: Vec<sys::OsPipe>,
children: Vec<PipelineChild>,
statuses: Vec<i32>,
foreground_group: Option<sys::ProcessHandle>,
foreground_group_display_pid: Option<u32>,
}
struct PipelineChild {
stage_index: usize,
handle: sys::ProcessHandle,
}
fn prepare_multi_stage_pipeline(stage_count: usize) -> Result<PreparedPipelineExecution, i32> {
let mut pipes: Vec<sys::OsPipe> = Vec::with_capacity(stage_count - 1);
for _ in 0..stage_count - 1 {
match sys::OsPipe::new() {
Ok(pipe) => pipes.push(pipe),
Err(_) => {
close_pipeline_pipes(&mut pipes);
return Err(1);
}
}
}
Ok(PreparedPipelineExecution {
pipes,
children: Vec::new(),
statuses: vec![0; stage_count],
foreground_group: None,
foreground_group_display_pid: None,
})
}
fn close_pipeline_pipes(pipes: &mut [sys::OsPipe]) {
for pipe in pipes {
pipe.read_fd.close();
pipe.read_fd = sys::FileDescriptor::INVALID;
pipe.write_fd.close();
pipe.write_fd = sys::FileDescriptor::INVALID;
}
}
fn pipeline_stage_stdio(
state: &ShellState,
stage_index: usize,
stage_count: usize,
pipes: &[sys::OsPipe],
) -> (
sys::FileDescriptor,
sys::FileDescriptor,
sys::FileDescriptor,
) {
let stdin_fd = if stage_index > 0 {
pipes[stage_index - 1].read_fd
} else {
state.stdin_fd
};
let stdout_fd = if stage_index < stage_count - 1 {
pipes[stage_index].write_fd
} else {
state.stdout_fd
};
(stdin_fd, stdout_fd, state.stderr_fd)
}
struct PipelineStageRuntime<R: Runtime> {
state: ShellState,
runtime: R,
stdin_fd: sys::FileDescriptor,
stdout_fd: sys::FileDescriptor,
stderr_fd: sys::FileDescriptor,
process_global_guard: bool,
}
impl<R: Runtime> Drop for PipelineStageRuntime<R> {
fn drop(&mut self) {
close_pipeline_stage_stdio(self.stdin_fd, self.stdout_fd, self.stderr_fd);
}
}
fn report_pipeline_stage_setup_error(state: &ShellState, message: impl AsRef<str>) {
shell_errln(state, &state.prefixed_message(message.as_ref()));
}
fn duplicate_pipeline_stage_fd(
state: &ShellState,
fd: sys::FileDescriptor,
stream: &str,
) -> Result<sys::FileDescriptor, i32> {
duplicate_shell_fd(fd).map_err(|err| {
report_pipeline_stage_setup_error(
state,
format!("failed to duplicate pipeline stage {stream}: {err}"),
);
1
})
}
fn duplicate_pipeline_stage_stdio(
state: &ShellState,
stdin_fd: sys::FileDescriptor,
stdout_fd: sys::FileDescriptor,
stderr_fd: sys::FileDescriptor,
) -> Result<
(
sys::FileDescriptor,
sys::FileDescriptor,
sys::FileDescriptor,
),
i32,
> {
let stdin_fd = duplicate_pipeline_stage_fd(state, stdin_fd, "stdin")?;
let stdout_fd = match duplicate_pipeline_stage_fd(state, stdout_fd, "stdout") {
Ok(fd) => fd,
Err(status) => {
stdin_fd.close();
return Err(status);
}
};
let stderr_fd = match duplicate_pipeline_stage_fd(state, stderr_fd, "stderr") {
Ok(fd) => fd,
Err(status) => {
stdin_fd.close();
stdout_fd.close();
return Err(status);
}
};
Ok((stdin_fd, stdout_fd, stderr_fd))
}
fn close_pipeline_stage_stdio(
stdin_fd: sys::FileDescriptor,
stdout_fd: sys::FileDescriptor,
stderr_fd: sys::FileDescriptor,
) {
stdin_fd.close();
stdout_fd.close();
stderr_fd.close();
}
fn prepare_shell_pipeline_stage<R: Runtime>(
state: &ShellState,
runtime: &R,
stdin_fd: sys::FileDescriptor,
stdout_fd: sys::FileDescriptor,
stderr_fd: sys::FileDescriptor,
process_global_guard: bool,
) -> Result<PipelineStageRuntime<R>, i32> {
let (stdin_fd, stdout_fd, stderr_fd) =
duplicate_pipeline_stage_stdio(state, stdin_fd, stdout_fd, stderr_fd)?;
let child_runtime = match runtime.fork() {
Ok(runtime) => runtime,
Err(err) => {
close_pipeline_stage_stdio(stdin_fd, stdout_fd, stderr_fd);
report_pipeline_stage_setup_error(
state,
format!("failed to fork pipeline shell runtime: {err}"),
);
return Err(1);
}
};
Ok(PipelineStageRuntime {
state: state.fork_for(ExecutionContextKind::PipelineStage),
runtime: child_runtime,
stdin_fd,
stdout_fd,
stderr_fd,
process_global_guard,
})
}
#[derive(Clone, Copy)]
enum ShellPipelineStageWork<'plan, 'ast> {
Simple(&'plan PlannedSimpleCommand<'ast>),
Lazy(&'plan ExecLazyNode<'ast>),
}
fn run_shell_pipeline_stage<R: Runtime>(
mut stage: PipelineStageRuntime<R>,
work: ShellPipelineStageWork<'_, '_>,
) -> i32 {
let process_globals = stage
.process_global_guard
.then(|| ProcessGlobalGuard::capture_for(&stage.state));
let status = match work {
ShellPipelineStageWork::Simple(plan) => run_planned_simple_command_with_stdio(
&mut stage.state,
&mut stage.runtime,
plan,
stage.stdin_fd,
stage.stdout_fd,
stage.stderr_fd,
),
ShellPipelineStageWork::Lazy(lazy) => LazyNode::from_planned(lazy)
.execute_command_with_stdio(
&mut stage.state,
&mut stage.runtime,
stage.stdin_fd,
stage.stdout_fd,
stage.stderr_fd,
),
};
drop(process_globals);
drop(stage);
status
}
type ShellPipelineStageHandle<'scope> = (usize, ScopedJoinHandle<'scope, i32>);
fn join_shell_pipeline_stages(
state: &ShellState,
handles: Vec<ShellPipelineStageHandle<'_>>,
statuses: &mut [i32],
) {
for (stage_index, handle) in handles {
statuses[stage_index] = match handle.join() {
Ok(status) => status,
Err(_) => {
report_pipeline_stage_setup_error(state, "pipeline shell stage thread panicked");
128
}
};
}
}
fn close_external_stage_parent_pipe_ends(
prepared: &mut PreparedPipelineExecution,
stage_index: usize,
stage_count: usize,
) {
if stage_index > 0 {
prepared.pipes[stage_index - 1].read_fd.close();
prepared.pipes[stage_index - 1].read_fd = sys::FileDescriptor::INVALID;
}
if stage_index < stage_count - 1 {
prepared.pipes[stage_index].write_fd.close();
prepared.pipes[stage_index].write_fd = sys::FileDescriptor::INVALID;
}
}
fn literal_word_value(word: &Word) -> Option<&str> {
match word {
Word::String(string) => Some(string.value()),
_ => None,
}
}
fn command_name_is_process_global_builtin(name: &str) -> bool {
matches!(name, "ulimit" | "umask")
}
fn argv_invokes_process_global_builtin(argv: &[String]) -> bool {
let Some(name) = argv.first().map(String::as_str) else {
return false;
};
if command_name_is_process_global_builtin(name) {
return true;
}
if matches!(name, "builtin" | "command") {
return argv
.get(1)
.is_some_and(|name| command_name_is_process_global_builtin(name));
}
false
}
fn literal_words_as_argv(command: &SimpleCommand, name: &str) -> Option<Vec<String>> {
let mut argv = Vec::with_capacity(command.arguments().len() + 1);
argv.push(name.to_string());
for arg in command.arguments() {
argv.push(literal_word_value(arg)?.to_string());
}
Some(argv)
}
fn shell_function_may_mutate_current_process_globals(
state: &ShellState,
name: &str,
seen_functions: &mut HashSet<String>,
) -> bool {
if !seen_functions.insert(name.to_string()) {
return true;
}
let result = state
.function_definition(name)
.map(|function| {
command_may_mutate_current_process_globals(state, &function.body, seen_functions)
})
.unwrap_or(true);
seen_functions.remove(name);
result
}
fn simple_command_may_mutate_current_process_globals(
state: &ShellState,
command: &SimpleCommand,
seen_functions: &mut HashSet<String>,
) -> bool {
let Some(name) = command.name().and_then(literal_word_value) else {
return command.name().is_some();
};
if command_name_is_process_global_builtin(name) {
return true;
}
if matches!(name, "builtin" | "command") {
return command
.arguments()
.first()
.map(|target| {
literal_word_value(target)
.map(command_name_is_process_global_builtin)
.unwrap_or(true)
})
.unwrap_or(false);
}
if state.function_definition(name).is_some() {
return shell_function_may_mutate_current_process_globals(state, name, seen_functions);
}
literal_words_as_argv(command, name)
.as_deref()
.is_some_and(|argv| shell_resolve::has_shell_override(state, argv))
}
fn command_lists_may_mutate_current_process_globals_with_seen(
state: &ShellState,
lists: &[CommandList],
seen_functions: &mut HashSet<String>,
) -> bool {
lists.iter().any(|list| {
and_or_list_may_mutate_current_process_globals(state, list.and_or_list(), seen_functions)
})
}
fn and_or_list_may_mutate_current_process_globals(
state: &ShellState,
and_or: &AndOrList,
seen_functions: &mut HashSet<String>,
) -> bool {
match and_or {
AndOrList::Pipeline(pipeline) => {
if pipeline.commands().len() == 1 {
pipeline.commands().first().is_some_and(|command| {
command_may_mutate_current_process_globals(state, command, seen_functions)
})
} else {
false
}
}
AndOrList::BinOp(binary) => {
and_or_list_may_mutate_current_process_globals(state, binary.left(), seen_functions)
|| and_or_list_may_mutate_current_process_globals(
state,
binary.right(),
seen_functions,
)
}
}
}
fn else_part_may_mutate_current_process_globals(
state: &ShellState,
else_part: &ElsePart,
seen_functions: &mut HashSet<String>,
) -> bool {
match else_part {
ElsePart::Elif(if_clause) => {
if_clause_may_mutate_current_process_globals(state, if_clause, seen_functions)
}
ElsePart::Else(else_clause) => command_lists_may_mutate_current_process_globals_with_seen(
state,
else_clause.body(),
seen_functions,
),
}
}
fn if_clause_may_mutate_current_process_globals(
state: &ShellState,
if_clause: &IfClause,
seen_functions: &mut HashSet<String>,
) -> bool {
command_lists_may_mutate_current_process_globals_with_seen(
state,
if_clause.condition(),
seen_functions,
) || command_lists_may_mutate_current_process_globals_with_seen(
state,
if_clause.body(),
seen_functions,
) || if_clause.else_part().is_some_and(|else_part| {
else_part_may_mutate_current_process_globals(state, else_part, seen_functions)
})
}
fn command_may_mutate_current_process_globals(
state: &ShellState,
command: &AstCommand,
seen_functions: &mut HashSet<String>,
) -> bool {
match command {
AstCommand::Simple(simple) => {
simple_command_may_mutate_current_process_globals(state, simple, seen_functions)
}
AstCommand::BraceGroup(group) => {
command_lists_may_mutate_current_process_globals_with_seen(
state,
group.body(),
seen_functions,
)
}
AstCommand::Subshell(_) => false,
AstCommand::If(if_clause) => {
if_clause_may_mutate_current_process_globals(state, if_clause, seen_functions)
}
AstCommand::For(for_clause) => command_lists_may_mutate_current_process_globals_with_seen(
state,
for_clause.body(),
seen_functions,
),
AstCommand::Loop(loop_clause) => {
command_lists_may_mutate_current_process_globals_with_seen(
state,
loop_clause.condition(),
seen_functions,
) || command_lists_may_mutate_current_process_globals_with_seen(
state,
loop_clause.body(),
seen_functions,
)
}
AstCommand::Case(case_clause) => case_clause.items().iter().any(|item| {
command_lists_may_mutate_current_process_globals_with_seen(
state,
item.body(),
seen_functions,
)
}),
AstCommand::FunctionDef(function) => {
command_may_mutate_current_process_globals(state, function.body(), seen_functions)
}
}
}
pub(super) fn command_lists_may_mutate_current_process_globals(
state: &ShellState,
lists: &[CommandList],
) -> bool {
command_lists_may_mutate_current_process_globals_with_seen(state, lists, &mut HashSet::new())
}
pub(super) fn program_may_mutate_current_process_globals(
state: &ShellState,
program: &Program,
) -> bool {
command_lists_may_mutate_current_process_globals(state, &program.body)
}
fn planned_simple_command_may_mutate_current_process_globals(
state: &ShellState,
plan: &PlannedSimpleCommand<'_>,
) -> bool {
match plan.kind() {
PlannedSimpleCommandKind::Builtin { argv } => argv_invokes_process_global_builtin(argv),
PlannedSimpleCommandKind::Function { command_name, .. } => {
shell_function_may_mutate_current_process_globals(
state,
command_name,
&mut HashSet::new(),
)
}
PlannedSimpleCommandKind::ShellOverride { .. }
| PlannedSimpleCommandKind::CommandNotFoundHandler { .. } => true,
_ => false,
}
}
fn shell_pipeline_stage_may_mutate_process_globals(
state: &ShellState,
work: ShellPipelineStageWork<'_, '_>,
) -> bool {
match work {
ShellPipelineStageWork::Simple(plan) => {
planned_simple_command_may_mutate_current_process_globals(state, plan)
}
ShellPipelineStageWork::Lazy(lazy) => lazy.command().is_some_and(|command| {
command_may_mutate_current_process_globals(state, command, &mut HashSet::new())
}),
}
}
fn spawn_external_pipeline_stages<R: Runtime>(
state: &ShellState,
runtime: &mut R,
pipeline: &ShellExecPipeline<'_>,
prepared: &mut PreparedPipelineExecution,
) {
let n = pipeline.stages().len();
let use_job_control = shell_job_control_active(state);
let mut process_group_leader = None;
for (i, stage) in pipeline.stages().iter().enumerate() {
if let ExecPipelineStage::PreparedExternal(stage) = stage {
let (stage_stdin, stage_stdout, stage_stderr) =
pipeline_stage_stdio(state, i, n, &prepared.pipes);
let mut close_fds: Vec<sys::FileDescriptor> =
Vec::with_capacity(prepared.pipes.len() * 2);
for pipe in &prepared.pipes {
for fd in [pipe.read_fd, pipe.write_fd] {
if fd == stage_stdin || fd == stage_stdout || fd == stage_stderr {
continue;
}
close_fds.push(fd);
}
}
let process_group = if use_job_control {
process_group_leader
.map(ProcessGroupPlan::Join)
.unwrap_or(ProcessGroupPlan::New)
} else {
ProcessGroupPlan::Inherit
};
let launch = ChildLaunchPlan::new(
state,
stage.program().to_string(),
stage.argv().to_vec(),
process_group,
)
.with_env(stage.env().to_vec())
.with_cwd(stage.cwd().to_path_buf())
.with_stdio(stage_stdin, stage_stdout, stage_stderr)
.add_close_fds(close_fds);
match launch.spawn(runtime, sys::SpawnMode::Foreground) {
Ok(child) => {
if use_job_control && process_group_leader.is_none() {
process_group_leader = Some(child.handle);
prepared.foreground_group = Some(child.handle);
prepared.foreground_group_display_pid = child.display_pid;
}
prepared.children.push(PipelineChild {
stage_index: i,
handle: child.handle,
});
close_external_stage_parent_pipe_ends(prepared, i, n);
}
Err(err) => {
prepared.statuses[i] = sys::spawn_error_exit_status(&err);
close_external_stage_parent_pipe_ends(prepared, i, n);
}
}
}
}
}
fn execute_shell_pipeline_stages<R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
pipeline: &ShellExecPipeline<'_>,
prepared: &mut PreparedPipelineExecution,
) {
let n = pipeline.stages().len();
thread::scope(|scope| {
let mut handles: Vec<ShellPipelineStageHandle<'_>> = Vec::new();
for (i, stage) in pipeline.stages().iter().enumerate() {
let work = match stage {
ExecPipelineStage::PreparedExternal(_) => continue,
ExecPipelineStage::Failure { status } => {
prepared.statuses[i] = *status;
continue;
}
ExecPipelineStage::SimpleCommand(plan) => ShellPipelineStageWork::Simple(plan),
ExecPipelineStage::Lazy(lazy) => ShellPipelineStageWork::Lazy(lazy),
};
let (stdin_fd, stdout_fd, stderr_fd) =
pipeline_stage_stdio(state, i, n, &prepared.pipes);
let process_global_guard = shell_pipeline_stage_may_mutate_process_globals(state, work);
let stage_runtime = match prepare_shell_pipeline_stage(
state,
runtime,
stdin_fd,
stdout_fd,
stderr_fd,
process_global_guard,
) {
Ok(stage_runtime) => stage_runtime,
Err(status) => {
prepared.statuses[i] = status;
continue;
}
};
handles.push((
i,
scope.spawn(move || run_shell_pipeline_stage(stage_runtime, work)),
));
}
close_pipeline_pipes(&mut prepared.pipes);
join_shell_pipeline_stages(state, handles, &mut prepared.statuses);
});
}
fn wait_pipeline_child<R: Runtime>(
runtime: &mut R,
child: sys::ProcessHandle,
) -> (i32, Option<i32>) {
loop {
let event = match runtime.wait_process(child, sys::WaitMode::Block) {
Ok(event) => event,
Err(_) => return (128, None),
};
match shell_jobs::process_event_to_job_state(event) {
JobState::Running => continue,
JobState::Stopped(sig) => {
return (normalize_exit_status(128_i64 + i64::from(sig)), Some(sig));
}
JobState::Done(status) => return (normalize_exit_status(status), None),
}
}
}
fn collect_pipeline_statuses<R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
prepared: &mut PreparedPipelineExecution,
) -> i32 {
close_pipeline_pipes(&mut prepared.pipes);
let mut stopped_signal = None;
for child in prepared.children.drain(..) {
let (status, stopped) = wait_pipeline_child(runtime, child.handle);
prepared.statuses[child.stage_index] = status;
if stopped_signal.is_none() {
stopped_signal = stopped;
}
}
if let (Some(sig), Some(handle)) = (stopped_signal, prepared.foreground_group)
&& shell_job_control_active(state)
{
let job_id = state.job_table.next_job_id;
state.job_table.next_job_id += 1;
state.job_table.jobs.push(ShellJob {
job_id,
handle,
display_pid: prepared.foreground_group_display_pid,
state: JobState::Stopped(sig),
command_id: state
.active_command_id()
.map(ToString::to_string)
.unwrap_or_else(shell_events::new_command_id),
finish_emitted: false,
});
return normalize_exit_status(128_i64 + i64::from(sig));
}
prepared.statuses.last().copied().unwrap_or(0)
}
fn should_continue_and_or(op: BinOpType, left_status: i32) -> bool {
matches!(
(op, left_status == 0),
(BinOpType::And, true) | (BinOpType::Or, false)
)
}
fn execute_and_or_binop<R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
op: BinOpType,
left: &ExecAndOrList<'_>,
right: &ExecLazyNode<'_>,
) -> i32 {
state.push_errexit_context(ErrexitContext::AndOrLeft);
let left_status = run_exec_and_or_list(state, runtime, left);
state.pop_errexit_context();
state.set_last_status(left_status);
if state.exit_code >= 0 || !should_continue_and_or(op, left_status) {
if !should_continue_and_or(op, left_status) {
state.mark_next_status_errexit_exempt(ErrexitContext::AndOrLeft);
}
return left_status;
}
let right = LazyNode::from_planned(right).realize_with_shell_state(state, runtime);
run_exec_and_or_list(state, runtime, &right)
}
pub(super) fn run_exec_and_or_list<R: Runtime>(
state: &mut ShellState,
runtime: &mut R,
aol: &ExecAndOrList<'_>,
) -> i32 {
if state.has_option(OPT_NOEXEC) && !state.interactive {
return 0;
}
match aol {
ExecAndOrList::Pipeline(pipeline) => run_exec_pipeline(state, runtime, pipeline),
ExecAndOrList::BinOp { op, left, right } => {
execute_and_or_binop(state, runtime, *op, left, right)
}
}
}