1pub mod env_script;
8pub mod step_script;
9
10use std::collections::HashMap;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use openjd_expr::function_library::FunctionLibrary;
16use openjd_expr::ExprValue;
17use openjd_model::job::Action;
18use openjd_model::job::CancelationMode;
19use openjd_model::symbol_table::SymbolTable;
20use tokio::sync::mpsc;
21use tokio_util::sync::CancellationToken;
22
23use crate::action::ActionMessage;
24use crate::action::ActionState;
25use crate::action_filter::ActionFilter;
26use crate::error::SessionError;
27use crate::logging::log_subsection_banner;
28use crate::session_user::SessionUser;
29use crate::subprocess::{run_subprocess, SubprocessConfig, SubprocessResult};
30
31#[derive(Debug, Clone)]
43pub enum CancelMethod {
44 Terminate,
46 NotifyThenTerminate { terminate_delay: Duration },
48}
49
50impl std::fmt::Display for CancelMethod {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 match self {
53 Self::Terminate => write!(f, "Terminate"),
54 Self::NotifyThenTerminate { terminate_delay } => {
55 write!(f, "NotifyThenTerminate({}s)", terminate_delay.as_secs())
56 }
57 }
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum ScriptRunnerState {
64 Ready,
65 Running,
66 Canceling,
67 Canceled,
68 Timeout,
69 Failed,
70 Success,
71}
72
73impl std::fmt::Display for ScriptRunnerState {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 match self {
76 Self::Ready => write!(f, "Ready"),
77 Self::Running => write!(f, "Running"),
78 Self::Canceling => write!(f, "Canceling"),
79 Self::Canceled => write!(f, "Canceled"),
80 Self::Timeout => write!(f, "Timeout"),
81 Self::Failed => write!(f, "Failed"),
82 Self::Success => write!(f, "Success"),
83 }
84 }
85}
86
87pub(crate) struct ScriptRunnerBase {
93 pub state: ScriptRunnerState,
94 pub cancel_token: CancellationToken,
95 pub cancel_request_rx: Option<tokio::sync::watch::Receiver<Option<Duration>>>,
96 pub session_id: String,
97 pub working_directory: PathBuf,
98 pub files_directory: PathBuf,
99 pub helpers_directory: Option<PathBuf>,
100 pub user: Option<Arc<dyn SessionUser>>,
101 pub redactions_enabled: bool,
102 pub initial_redacted_values: Vec<String>,
103 pub debug_collect_stdout: bool,
104 pub echo_openjd_directives: bool,
108 #[cfg(unix)]
109 pub helper: Option<crate::cross_user_helper::CrossUserHelper>,
110 #[cfg(windows)]
111 pub helper: Option<crate::cross_user_helper::CrossUserHelperWin>,
112 pub cancel_writer: Option<std::fs::File>,
113}
114
115impl ScriptRunnerBase {
116 pub fn new(
117 session_id: &str,
118 working_directory: PathBuf,
119 files_directory: PathBuf,
120 user: Option<Arc<dyn SessionUser>>,
121 ) -> Self {
122 Self {
123 state: ScriptRunnerState::Ready,
124 cancel_token: CancellationToken::new(),
125 cancel_request_rx: None,
126 session_id: session_id.to_string(),
127 working_directory,
128 files_directory,
129 helpers_directory: None,
130 user,
131 redactions_enabled: false,
132 initial_redacted_values: Vec::new(),
133 debug_collect_stdout: false,
134 echo_openjd_directives: true,
135 helper: None,
136 cancel_writer: None,
137 }
138 }
139
140 #[allow(clippy::too_many_arguments)]
142 pub async fn run_action(
143 &mut self,
144 action: &Action,
145 symtab: &SymbolTable,
146 library: Option<&FunctionLibrary>,
147 env_vars: &HashMap<String, Option<String>>,
148 message_tx: mpsc::UnboundedSender<ActionMessage>,
149 default_timeout: Option<Duration>,
150 default_cancel_period: Duration,
151 ) -> Result<SubprocessResult, SessionError> {
152 self.state = ScriptRunnerState::Running;
153 log_subsection_banner(&self.session_id, "Phase: Running action");
154 let args = resolve_action_args(action, symtab, library)?;
155 let timeout = resolve_action_timeout(action, symtab, library, default_timeout)?;
156 let cancel_method = cancel_method_for_action(&action.cancelation, default_cancel_period);
157 let config = SubprocessConfig {
158 args,
159 env_vars: env_vars.clone(),
160 working_dir: Some(self.working_directory.clone()),
161 timeout,
162 user: self.user.clone(),
163 cancel_method,
164 cancel_request_rx: self.cancel_request_rx.clone(),
165 debug_collect_stdout: self.debug_collect_stdout,
166 };
167 let mut filter = ActionFilter::new(
168 &self.session_id,
169 self.echo_openjd_directives,
170 self.redactions_enabled,
171 );
172 filter.add_redacted_values(&self.initial_redacted_values);
173
174 if let Some(ref mut helper) = self.helper {
175 let result = crate::cross_user_helper::run_via_helper(
176 helper,
177 &config,
178 &mut filter,
179 &self.session_id,
180 message_tx,
181 self.cancel_writer.as_ref(),
182 )
183 .await?;
184 self.state = state_from_action(result.state);
185 return Ok(result);
186 }
187
188 let result = run_subprocess(
189 config,
190 &mut filter,
191 &self.session_id,
192 message_tx,
193 self.cancel_token.clone(),
194 )
195 .await?;
196
197 self.state = state_from_action(result.state);
198 Ok(result)
199 }
200}
201
202fn state_from_action(action_state: ActionState) -> ScriptRunnerState {
203 match action_state {
204 ActionState::Success => ScriptRunnerState::Success,
205 ActionState::Canceled => ScriptRunnerState::Canceled,
206 ActionState::Timeout => ScriptRunnerState::Timeout,
207 _ => ScriptRunnerState::Failed,
208 }
209}
210
211pub(crate) fn cancel_method_for_action(
213 cancelation: &Option<CancelationMode>,
214 default_notify_period: Duration,
215) -> CancelMethod {
216 match cancelation {
217 None | Some(CancelationMode::Terminate) => CancelMethod::Terminate,
218 Some(CancelationMode::NotifyThenTerminate {
219 notify_period_in_seconds,
220 }) => {
221 let period = notify_period_in_seconds
222 .as_ref()
223 .and_then(|fs| fs.raw().parse::<u64>().ok())
224 .map(Duration::from_secs)
225 .unwrap_or(default_notify_period);
226 CancelMethod::NotifyThenTerminate {
227 terminate_delay: period,
228 }
229 }
230 }
231}
232
233pub(crate) fn resolve_action_timeout(
235 action: &Action,
236 symtab: &SymbolTable,
237 library: Option<&FunctionLibrary>,
238 default: Option<Duration>,
239) -> Result<Option<Duration>, SessionError> {
240 match &action.timeout {
241 Some(fmt) => {
242 let s = fmt
243 .resolve_string_with(
244 symtab,
245 &openjd_expr::FormatStringOptions::new().with_library(library),
246 )
247 .map_err(|e| SessionError::FormatString {
248 context: "timeout".into(),
249 reason: e.to_string(),
250 })?;
251 let secs: u64 = s.parse().map_err(|_| SessionError::FormatString {
252 context: "timeout".into(),
253 reason: format!("timeout must be a positive integer, got '{s}'"),
254 })?;
255 if secs == 0 {
256 return Err(SessionError::FormatString {
257 context: "timeout".into(),
258 reason: "timeout must be a positive integer, got '0'".into(),
259 });
260 }
261 Ok(Some(Duration::from_secs(secs)))
262 }
263 None => Ok(default),
264 }
265}
266
267pub(crate) fn resolve_action_args(
269 action: &Action,
270 symtab: &SymbolTable,
271 library: Option<&FunctionLibrary>,
272) -> Result<Vec<String>, SessionError> {
273 let command = action
274 .command
275 .resolve_string_with(
276 symtab,
277 &openjd_expr::FormatStringOptions::new().with_library(library),
278 )
279 .map_err(|e| SessionError::FormatString {
280 context: "command".into(),
281 reason: e.to_string(),
282 })?;
283 let mut args = vec![command];
284 if let Some(arg_fmts) = &action.args {
285 for fs in arg_fmts {
286 if let Ok(val) = fs.resolve_with(
287 symtab,
288 &openjd_expr::FormatStringOptions::new().with_library(library),
289 ) {
290 match val {
291 ExprValue::Null => continue,
292 val if val.is_list() => {
293 if let Some(elements) = val.list_elements() {
294 for elem in &elements {
295 args.push(elem.to_display_string());
296 }
297 }
298 continue;
299 }
300 val => args.push(val.to_display_string()),
301 }
302 } else {
303 let s = fs
304 .resolve_string_with(
305 symtab,
306 &openjd_expr::FormatStringOptions::new().with_library(library),
307 )
308 .map_err(|e| SessionError::FormatString {
309 context: "argument".into(),
310 reason: e.to_string(),
311 })?;
312 args.push(s);
313 }
314 }
315 }
316 Ok(args)
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322
323 #[test]
324 fn script_runner_state_display() {
325 assert_eq!(ScriptRunnerState::Ready.to_string(), "Ready");
326 assert_eq!(ScriptRunnerState::Running.to_string(), "Running");
327 assert_eq!(ScriptRunnerState::Canceling.to_string(), "Canceling");
328 assert_eq!(ScriptRunnerState::Canceled.to_string(), "Canceled");
329 assert_eq!(ScriptRunnerState::Timeout.to_string(), "Timeout");
330 assert_eq!(ScriptRunnerState::Failed.to_string(), "Failed");
331 assert_eq!(ScriptRunnerState::Success.to_string(), "Success");
332 }
333
334 #[test]
335 fn cancel_method_display() {
336 assert_eq!(CancelMethod::Terminate.to_string(), "Terminate");
337 assert_eq!(
338 CancelMethod::NotifyThenTerminate {
339 terminate_delay: Duration::from_secs(30)
340 }
341 .to_string(),
342 "NotifyThenTerminate(30s)"
343 );
344 }
345}