Skip to main content

openjd_sessions/runner/
mod.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// Copyright by contributors to this project.
3// SPDX-License-Identifier: (Apache-2.0 OR MIT)
4
5//! Script runners for environment and step actions.
6
7pub mod env_script;
8pub mod step_script;
9
10use std::collections::HashMap;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::time::Duration;
14
15use openjd_expr::function_library::FunctionLibrary;
16use openjd_expr::ExprValue;
17use openjd_model::job::Action;
18use openjd_model::job::CancelationMode;
19use openjd_model::symbol_table::SymbolTable;
20use tokio::sync::mpsc;
21use tokio_util::sync::CancellationToken;
22
23use crate::action::ActionMessage;
24use crate::action::ActionState;
25use crate::action_filter::ActionFilter;
26use crate::error::SessionError;
27use crate::logging::log_subsection_banner;
28use crate::session_user::SessionUser;
29use crate::subprocess::{run_subprocess, SubprocessConfig, SubprocessResult};
30
31/// Method for canceling a running action.
32///
33/// ```
34/// use openjd_sessions::CancelMethod;
35/// use std::time::Duration;
36///
37/// let method = CancelMethod::NotifyThenTerminate {
38///     terminate_delay: Duration::from_secs(30),
39/// };
40/// assert!(matches!(method, CancelMethod::NotifyThenTerminate { .. }));
41/// ```
42#[derive(Debug, Clone)]
43pub enum CancelMethod {
44    /// Immediately terminate via SIGKILL.
45    Terminate,
46    /// Send SIGTERM, wait for grace period, then SIGKILL.
47    NotifyThenTerminate { terminate_delay: Duration },
48}
49
50impl std::fmt::Display for CancelMethod {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        match self {
53            Self::Terminate => write!(f, "Terminate"),
54            Self::NotifyThenTerminate { terminate_delay } => {
55                write!(f, "NotifyThenTerminate({}s)", terminate_delay.as_secs())
56            }
57        }
58    }
59}
60
61/// State of a script runner.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum ScriptRunnerState {
64    Ready,
65    Running,
66    Canceling,
67    Canceled,
68    Timeout,
69    Failed,
70    Success,
71}
72
73impl std::fmt::Display for ScriptRunnerState {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        match self {
76            Self::Ready => write!(f, "Ready"),
77            Self::Running => write!(f, "Running"),
78            Self::Canceling => write!(f, "Canceling"),
79            Self::Canceled => write!(f, "Canceled"),
80            Self::Timeout => write!(f, "Timeout"),
81            Self::Failed => write!(f, "Failed"),
82            Self::Success => write!(f, "Success"),
83        }
84    }
85}
86
87/// Shared infrastructure for script runners.
88///
89/// Both `EnvironmentScriptRunner` and `StepScriptRunner` compose this struct
90/// to avoid duplicating constructor, builder, cancel/state, and subprocess
91/// execution logic.
92pub(crate) struct ScriptRunnerBase {
93    pub state: ScriptRunnerState,
94    pub cancel_token: CancellationToken,
95    pub cancel_request_rx: Option<tokio::sync::watch::Receiver<Option<Duration>>>,
96    pub session_id: String,
97    pub working_directory: PathBuf,
98    pub files_directory: PathBuf,
99    pub helpers_directory: Option<PathBuf>,
100    pub user: Option<Arc<dyn SessionUser>>,
101    pub redactions_enabled: bool,
102    pub initial_redacted_values: Vec<String>,
103    pub debug_collect_stdout: bool,
104    /// Whether to echo `openjd_*` directive lines to the log. See
105    /// [`crate::session::SessionConfig::echo_openjd_directives`]. Defaults
106    /// to `true` to match the Python reference implementation.
107    pub echo_openjd_directives: bool,
108    #[cfg(unix)]
109    pub helper: Option<crate::cross_user_helper::CrossUserHelper>,
110    #[cfg(windows)]
111    pub helper: Option<crate::cross_user_helper::CrossUserHelperWin>,
112    pub cancel_writer: Option<std::fs::File>,
113}
114
115impl ScriptRunnerBase {
116    pub fn new(
117        session_id: &str,
118        working_directory: PathBuf,
119        files_directory: PathBuf,
120        user: Option<Arc<dyn SessionUser>>,
121    ) -> Self {
122        Self {
123            state: ScriptRunnerState::Ready,
124            cancel_token: CancellationToken::new(),
125            cancel_request_rx: None,
126            session_id: session_id.to_string(),
127            working_directory,
128            files_directory,
129            helpers_directory: None,
130            user,
131            redactions_enabled: false,
132            initial_redacted_values: Vec::new(),
133            debug_collect_stdout: false,
134            echo_openjd_directives: true,
135            helper: None,
136            cancel_writer: None,
137        }
138    }
139
140    /// Run a resolved action as a subprocess, updating runner state.
141    #[allow(clippy::too_many_arguments)]
142    pub async fn run_action(
143        &mut self,
144        action: &Action,
145        symtab: &SymbolTable,
146        library: Option<&FunctionLibrary>,
147        env_vars: &HashMap<String, Option<String>>,
148        message_tx: mpsc::UnboundedSender<ActionMessage>,
149        default_timeout: Option<Duration>,
150        default_cancel_period: Duration,
151    ) -> Result<SubprocessResult, SessionError> {
152        self.state = ScriptRunnerState::Running;
153        log_subsection_banner(&self.session_id, "Phase: Running action");
154        let args = resolve_action_args(action, symtab, library)?;
155        let timeout = resolve_action_timeout(action, symtab, library, default_timeout)?;
156        let cancel_method = cancel_method_for_action(&action.cancelation, default_cancel_period);
157        let config = SubprocessConfig {
158            args,
159            env_vars: env_vars.clone(),
160            working_dir: Some(self.working_directory.clone()),
161            timeout,
162            user: self.user.clone(),
163            cancel_method,
164            cancel_request_rx: self.cancel_request_rx.clone(),
165            debug_collect_stdout: self.debug_collect_stdout,
166        };
167        let mut filter = ActionFilter::new(
168            &self.session_id,
169            self.echo_openjd_directives,
170            self.redactions_enabled,
171        );
172        filter.add_redacted_values(&self.initial_redacted_values);
173
174        if let Some(ref mut helper) = self.helper {
175            let result = crate::cross_user_helper::run_via_helper(
176                helper,
177                &config,
178                &mut filter,
179                &self.session_id,
180                message_tx,
181                self.cancel_writer.as_ref(),
182            )
183            .await?;
184            self.state = state_from_action(result.state);
185            return Ok(result);
186        }
187
188        let result = run_subprocess(
189            config,
190            &mut filter,
191            &self.session_id,
192            message_tx,
193            self.cancel_token.clone(),
194        )
195        .await?;
196
197        self.state = state_from_action(result.state);
198        Ok(result)
199    }
200}
201
202fn state_from_action(action_state: ActionState) -> ScriptRunnerState {
203    match action_state {
204        ActionState::Success => ScriptRunnerState::Success,
205        ActionState::Canceled => ScriptRunnerState::Canceled,
206        ActionState::Timeout => ScriptRunnerState::Timeout,
207        _ => ScriptRunnerState::Failed,
208    }
209}
210
211/// Determine the cancel method from an action's cancelation field.
212pub(crate) fn cancel_method_for_action(
213    cancelation: &Option<CancelationMode>,
214    default_notify_period: Duration,
215) -> CancelMethod {
216    match cancelation {
217        None | Some(CancelationMode::Terminate) => CancelMethod::Terminate,
218        Some(CancelationMode::NotifyThenTerminate {
219            notify_period_in_seconds,
220        }) => {
221            let period = notify_period_in_seconds
222                .as_ref()
223                .and_then(|fs| fs.raw().parse::<u64>().ok())
224                .map(Duration::from_secs)
225                .unwrap_or(default_notify_period);
226            CancelMethod::NotifyThenTerminate {
227                terminate_delay: period,
228            }
229        }
230    }
231}
232
233/// Resolve an Action's timeout field to a Duration, falling back to a default.
234pub(crate) fn resolve_action_timeout(
235    action: &Action,
236    symtab: &SymbolTable,
237    library: Option<&FunctionLibrary>,
238    default: Option<Duration>,
239) -> Result<Option<Duration>, SessionError> {
240    match &action.timeout {
241        Some(fmt) => {
242            let s = fmt
243                .resolve_string_with(
244                    symtab,
245                    &openjd_expr::FormatStringOptions::new().with_library(library),
246                )
247                .map_err(|e| SessionError::FormatString {
248                    context: "timeout".into(),
249                    reason: e.to_string(),
250                })?;
251            let secs: u64 = s.parse().map_err(|_| SessionError::FormatString {
252                context: "timeout".into(),
253                reason: format!("timeout must be a positive integer, got '{s}'"),
254            })?;
255            if secs == 0 {
256                return Err(SessionError::FormatString {
257                    context: "timeout".into(),
258                    reason: "timeout must be a positive integer, got '0'".into(),
259                });
260            }
261            Ok(Some(Duration::from_secs(secs)))
262        }
263        None => Ok(default),
264    }
265}
266
267/// Resolve an Action's command and args into a flat argument list.
268pub(crate) fn resolve_action_args(
269    action: &Action,
270    symtab: &SymbolTable,
271    library: Option<&FunctionLibrary>,
272) -> Result<Vec<String>, SessionError> {
273    let command = action
274        .command
275        .resolve_string_with(
276            symtab,
277            &openjd_expr::FormatStringOptions::new().with_library(library),
278        )
279        .map_err(|e| SessionError::FormatString {
280            context: "command".into(),
281            reason: e.to_string(),
282        })?;
283    let mut args = vec![command];
284    if let Some(arg_fmts) = &action.args {
285        for fs in arg_fmts {
286            if let Ok(val) = fs.resolve_with(
287                symtab,
288                &openjd_expr::FormatStringOptions::new().with_library(library),
289            ) {
290                match val {
291                    ExprValue::Null => continue,
292                    val if val.is_list() => {
293                        if let Some(elements) = val.list_elements() {
294                            for elem in &elements {
295                                args.push(elem.to_display_string());
296                            }
297                        }
298                        continue;
299                    }
300                    val => args.push(val.to_display_string()),
301                }
302            } else {
303                let s = fs
304                    .resolve_string_with(
305                        symtab,
306                        &openjd_expr::FormatStringOptions::new().with_library(library),
307                    )
308                    .map_err(|e| SessionError::FormatString {
309                        context: "argument".into(),
310                        reason: e.to_string(),
311                    })?;
312                args.push(s);
313            }
314        }
315    }
316    Ok(args)
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322
323    #[test]
324    fn script_runner_state_display() {
325        assert_eq!(ScriptRunnerState::Ready.to_string(), "Ready");
326        assert_eq!(ScriptRunnerState::Running.to_string(), "Running");
327        assert_eq!(ScriptRunnerState::Canceling.to_string(), "Canceling");
328        assert_eq!(ScriptRunnerState::Canceled.to_string(), "Canceled");
329        assert_eq!(ScriptRunnerState::Timeout.to_string(), "Timeout");
330        assert_eq!(ScriptRunnerState::Failed.to_string(), "Failed");
331        assert_eq!(ScriptRunnerState::Success.to_string(), "Success");
332    }
333
334    #[test]
335    fn cancel_method_display() {
336        assert_eq!(CancelMethod::Terminate.to_string(), "Terminate");
337        assert_eq!(
338            CancelMethod::NotifyThenTerminate {
339                terminate_delay: Duration::from_secs(30)
340            }
341            .to_string(),
342            "NotifyThenTerminate(30s)"
343        );
344    }
345}