1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
//! Exec verb implementation for TaskExecutor
//!
//! Contains `run_exec` for shell command execution.
use std::sync::Arc;
use std::time::Instant;
use tracing::instrument;
use crate::ast::ExecParams;
use crate::binding::{template_resolve, ResolvedBindings};
use crate::error::NikaError;
use crate::event::EventKind;
use crate::runtime::policy::PolicyDecision;
use crate::store::RunContext;
use crate::util::EXEC_TIMEOUT;
use super::verbs::redact_for_event;
use super::TaskExecutor;
impl TaskExecutor {
#[instrument(skip(self, bindings, datastore), fields(%task_id))]
pub(super) async fn run_exec(
&self,
task_id: &Arc<str>,
params: &ExecParams,
bindings: &ResolvedBindings,
datastore: &RunContext,
) -> Result<String, NikaError> {
// Resolve {{with.alias}} templates
// Note: Shell escaping is NOT applied by default.
// For values that need shell escaping, use {{with.alias|shell}} syntax.
let resolved_cmd = template_resolve(¶ms.command, bindings, datastore)?;
// SECURITY CHECK: validate command for control characters and blocklist
// In shell mode, also block command substitution ($(), backticks)
let is_shell = params.shell == Some(true);
crate::runtime::security::validate_exec_command_with_shell(&resolved_cmd, is_shell)?;
// POLICY CHECK: exec verb
let policy_decision = self.policy_enforcer.read().check_exec(&resolved_cmd);
if let PolicyDecision::Block(reason) = policy_decision {
// EMIT: PolicyBlocked
self.event_log.emit(EventKind::PolicyBlocked {
task_id: Arc::clone(task_id),
verb: "exec".to_string(),
policy_type: "command_blocklist".to_string(),
reason: reason.clone(),
});
tracing::warn!(
task_id = %task_id,
command = %resolved_cmd,
reason = %reason,
"exec: blocked by policy"
);
return Err(NikaError::PolicyViolation { reason });
}
// EMIT: TemplateResolved (redacted to avoid leaking secrets)
self.event_log.emit(EventKind::TemplateResolved {
task_id: Arc::clone(task_id),
template: params.command.clone(),
result: redact_for_event(&resolved_cmd),
});
// Use per-task timeout if specified, otherwise fall back to global default
let exec_deadline = params
.timeout
.map(std::time::Duration::from_secs)
.unwrap_or(EXEC_TIMEOUT);
// Shell-free execution by default, opt-in to shell mode
// Support for env vars
let exec_start = Instant::now();
let output =
if params.shell == Some(true) {
// Shell mode: use sh -c (preserves shell metacharacters like ;, |, &&)
tracing::debug!(task_id = %task_id, "exec: using shell mode (sh -c)");
let mut cmd = tokio::process::Command::new("sh");
cmd.arg("-c").arg(resolved_cmd.as_ref());
// Pipe stdout/stderr for capture (required by spawn + wait_with_output)
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
// Strip sensitive env vars from child process
crate::runtime::security::strip_sensitive_env_vars(&mut cmd);
// Set working directory if specified (with path traversal protection)
if let Some(ref cwd) = params.cwd {
let resolved = std::path::Path::new(cwd).canonicalize().map_err(|e| {
NikaError::ExecError {
reason: format!("Invalid cwd '{}': {}", cwd, e),
}
})?;
let working_dir = self
.workflow_base_dir
.canonicalize()
.unwrap_or_else(|_| self.workflow_base_dir.clone());
if !resolved.starts_with(&working_dir) {
return Err(NikaError::ExecError {
reason: format!(
"Security: exec cwd '{}' escapes working directory '{}'",
cwd,
working_dir.display()
),
});
}
cmd.current_dir(resolved);
}
// Add environment variables if specified (validate first)
if let Some(ref env_vars) = params.env {
let pairs: Vec<(String, String)> = env_vars
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
crate::runtime::security::validate_env_vars(&pairs)?;
for (key, value) in env_vars {
let resolved_value = template_resolve(value, bindings, datastore)?;
cmd.env(key, resolved_value.as_ref());
}
}
// kill_on_drop ensures child is killed when dropped on timeout (prevents orphans)
cmd.kill_on_drop(true);
let child = cmd.spawn().map_err(|e| NikaError::ExecError {
reason: format!("Failed to spawn command: {}", e),
})?;
match tokio::time::timeout(exec_deadline, child.wait_with_output()).await {
Ok(Ok(out)) => out,
Ok(Err(e)) => {
return Err(NikaError::ExecError {
reason: format!("Failed to execute command: {}", e),
});
}
Err(_) => {
// child is dropped here -> kill_on_drop sends SIGKILL
return Err(NikaError::ExecError {
reason: format!("Command timed out after {}s", exec_deadline.as_secs()),
});
}
}
} else {
// Shell-free mode (default): parse with shlex, execute directly
tracing::debug!(task_id = %task_id, "exec: using shell-free mode (shlex)");
let parts = shlex::split(&resolved_cmd).ok_or_else(|| NikaError::ExecError {
reason: format!(
"Failed to parse command (unbalanced quotes?): {}",
resolved_cmd
),
})?;
if parts.is_empty() {
return Err(NikaError::ExecError {
reason: "Empty command".to_string(),
});
}
let mut cmd = tokio::process::Command::new(&parts[0]);
cmd.args(&parts[1..]);
// Pipe stdout/stderr for capture (required by spawn + wait_with_output)
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
// Strip sensitive env vars from child process
crate::runtime::security::strip_sensitive_env_vars(&mut cmd);
// Set working directory if specified (with path traversal protection)
if let Some(ref cwd) = params.cwd {
let resolved = std::path::Path::new(cwd).canonicalize().map_err(|e| {
NikaError::ExecError {
reason: format!("Invalid cwd '{}': {}", cwd, e),
}
})?;
let working_dir = self
.workflow_base_dir
.canonicalize()
.unwrap_or_else(|_| self.workflow_base_dir.clone());
if !resolved.starts_with(&working_dir) {
return Err(NikaError::ExecError {
reason: format!(
"Security: exec cwd '{}' escapes working directory '{}'",
cwd,
working_dir.display()
),
});
}
cmd.current_dir(resolved);
}
// Add environment variables if specified (validate first)
if let Some(ref env_vars) = params.env {
let pairs: Vec<(String, String)> = env_vars
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
crate::runtime::security::validate_env_vars(&pairs)?;
for (key, value) in env_vars {
let resolved_value = template_resolve(value, bindings, datastore)?;
cmd.env(key, resolved_value.as_ref());
}
}
// kill_on_drop ensures child is killed when dropped on timeout (prevents orphans)
cmd.kill_on_drop(true);
let child = cmd.spawn().map_err(|e| NikaError::ExecError {
reason: format!("Failed to spawn command: {}", e),
})?;
match tokio::time::timeout(exec_deadline, child.wait_with_output()).await {
Ok(Ok(out)) => out,
Ok(Err(e)) => {
return Err(NikaError::ExecError {
reason: format!("Failed to execute command: {}", e),
});
}
Err(_) => {
// child is dropped here -> kill_on_drop sends SIGKILL
return Err(NikaError::ExecError {
reason: format!("Command timed out after {}s", exec_deadline.as_secs()),
});
}
}
};
// EMIT: ExecCompleted (emitted for both success and failure)
let exec_duration_ms = exec_start.elapsed().as_millis() as u64;
let exit_code = output.status.code().unwrap_or(-1);
self.event_log.emit(EventKind::ExecCompleted {
task_id: Arc::clone(task_id),
exit_code,
stdout_len: output.stdout.len(),
stderr_len: output.stderr.len(),
duration_ms: exec_duration_ms,
});
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(NikaError::ExecError {
reason: format!("Command failed: {}", stderr),
});
}
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
}