pub mod env_script;
pub mod step_script;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use openjd_expr::function_library::FunctionLibrary;
use openjd_expr::ExprValue;
use openjd_model::job::Action;
use openjd_model::job::CancelationMode;
use openjd_model::symbol_table::SymbolTable;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::action::ActionMessage;
use crate::action::ActionState;
use crate::action_filter::ActionFilter;
use crate::error::SessionError;
use crate::logging::log_subsection_banner;
use crate::session_user::SessionUser;
use crate::subprocess::{run_subprocess, SubprocessConfig, SubprocessResult};
#[derive(Debug, Clone)]
pub enum CancelMethod {
Terminate,
NotifyThenTerminate { terminate_delay: Duration },
}
impl std::fmt::Display for CancelMethod {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Terminate => write!(f, "Terminate"),
Self::NotifyThenTerminate { terminate_delay } => {
write!(f, "NotifyThenTerminate({}s)", terminate_delay.as_secs())
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ScriptRunnerState {
Ready,
Running,
Canceling,
Canceled,
Timeout,
Failed,
Success,
}
impl std::fmt::Display for ScriptRunnerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Ready => write!(f, "Ready"),
Self::Running => write!(f, "Running"),
Self::Canceling => write!(f, "Canceling"),
Self::Canceled => write!(f, "Canceled"),
Self::Timeout => write!(f, "Timeout"),
Self::Failed => write!(f, "Failed"),
Self::Success => write!(f, "Success"),
}
}
}
pub(crate) struct ScriptRunnerBase {
pub state: ScriptRunnerState,
pub cancel_token: CancellationToken,
pub cancel_request_rx: Option<tokio::sync::watch::Receiver<Option<Duration>>>,
pub session_id: String,
pub working_directory: PathBuf,
pub files_directory: PathBuf,
pub helpers_directory: Option<PathBuf>,
pub user: Option<Arc<dyn SessionUser>>,
pub redactions_enabled: bool,
pub initial_redacted_values: Vec<String>,
pub debug_collect_stdout: bool,
pub echo_openjd_directives: bool,
#[cfg(unix)]
pub helper: Option<crate::cross_user_helper::CrossUserHelper>,
#[cfg(windows)]
pub helper: Option<crate::cross_user_helper::CrossUserHelperWin>,
pub cancel_writer: Option<std::fs::File>,
}
impl ScriptRunnerBase {
pub fn new(
session_id: &str,
working_directory: PathBuf,
files_directory: PathBuf,
user: Option<Arc<dyn SessionUser>>,
) -> Self {
Self {
state: ScriptRunnerState::Ready,
cancel_token: CancellationToken::new(),
cancel_request_rx: None,
session_id: session_id.to_string(),
working_directory,
files_directory,
helpers_directory: None,
user,
redactions_enabled: false,
initial_redacted_values: Vec::new(),
debug_collect_stdout: false,
echo_openjd_directives: true,
helper: None,
cancel_writer: None,
}
}
#[allow(clippy::too_many_arguments)]
pub async fn run_action(
&mut self,
action: &Action,
symtab: &SymbolTable,
library: Option<&FunctionLibrary>,
env_vars: &HashMap<String, Option<String>>,
message_tx: mpsc::UnboundedSender<ActionMessage>,
default_timeout: Option<Duration>,
default_cancel_period: Duration,
) -> Result<SubprocessResult, SessionError> {
self.state = ScriptRunnerState::Running;
log_subsection_banner(&self.session_id, "Phase: Running action");
let args = resolve_action_args(action, symtab, library)?;
let timeout = resolve_action_timeout(action, symtab, library, default_timeout)?;
let cancel_method = cancel_method_for_action(&action.cancelation, default_cancel_period);
let config = SubprocessConfig {
args,
env_vars: env_vars.clone(),
working_dir: Some(self.working_directory.clone()),
timeout,
user: self.user.clone(),
cancel_method,
cancel_request_rx: self.cancel_request_rx.clone(),
debug_collect_stdout: self.debug_collect_stdout,
};
let mut filter = ActionFilter::new(
&self.session_id,
self.echo_openjd_directives,
self.redactions_enabled,
);
filter.add_redacted_values(&self.initial_redacted_values);
if let Some(ref mut helper) = self.helper {
let result = crate::cross_user_helper::run_via_helper(
helper,
&config,
&mut filter,
&self.session_id,
message_tx,
self.cancel_writer.as_ref(),
)
.await?;
self.state = state_from_action(result.state);
return Ok(result);
}
let result = run_subprocess(
config,
&mut filter,
&self.session_id,
message_tx,
self.cancel_token.clone(),
)
.await?;
self.state = state_from_action(result.state);
Ok(result)
}
}
fn state_from_action(action_state: ActionState) -> ScriptRunnerState {
match action_state {
ActionState::Success => ScriptRunnerState::Success,
ActionState::Canceled => ScriptRunnerState::Canceled,
ActionState::Timeout => ScriptRunnerState::Timeout,
_ => ScriptRunnerState::Failed,
}
}
pub(crate) fn cancel_method_for_action(
cancelation: &Option<CancelationMode>,
default_notify_period: Duration,
) -> CancelMethod {
match cancelation {
None | Some(CancelationMode::Terminate) => CancelMethod::Terminate,
Some(CancelationMode::NotifyThenTerminate {
notify_period_in_seconds,
}) => {
let period = notify_period_in_seconds
.as_ref()
.and_then(|fs| fs.raw().parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or(default_notify_period);
CancelMethod::NotifyThenTerminate {
terminate_delay: period,
}
}
}
}
pub(crate) fn resolve_action_timeout(
action: &Action,
symtab: &SymbolTable,
library: Option<&FunctionLibrary>,
default: Option<Duration>,
) -> Result<Option<Duration>, SessionError> {
match &action.timeout {
Some(fmt) => {
let s = fmt
.resolve_string_with(
symtab,
&openjd_expr::FormatStringOptions::new().with_library(library),
)
.map_err(|e| SessionError::FormatString {
context: "timeout".into(),
reason: e.to_string(),
})?;
let secs: u64 = s.parse().map_err(|_| SessionError::FormatString {
context: "timeout".into(),
reason: format!("timeout must be a positive integer, got '{s}'"),
})?;
if secs == 0 {
return Err(SessionError::FormatString {
context: "timeout".into(),
reason: "timeout must be a positive integer, got '0'".into(),
});
}
Ok(Some(Duration::from_secs(secs)))
}
None => Ok(default),
}
}
pub(crate) fn resolve_action_args(
action: &Action,
symtab: &SymbolTable,
library: Option<&FunctionLibrary>,
) -> Result<Vec<String>, SessionError> {
let command = action
.command
.resolve_string_with(
symtab,
&openjd_expr::FormatStringOptions::new().with_library(library),
)
.map_err(|e| SessionError::FormatString {
context: "command".into(),
reason: e.to_string(),
})?;
let mut args = vec![command];
if let Some(arg_fmts) = &action.args {
for fs in arg_fmts {
if let Ok(val) = fs.resolve_with(
symtab,
&openjd_expr::FormatStringOptions::new().with_library(library),
) {
match val {
ExprValue::Null => continue,
val if val.is_list() => {
if let Some(elements) = val.list_elements() {
for elem in &elements {
args.push(elem.to_display_string());
}
}
continue;
}
val => args.push(val.to_display_string()),
}
} else {
let s = fs
.resolve_string_with(
symtab,
&openjd_expr::FormatStringOptions::new().with_library(library),
)
.map_err(|e| SessionError::FormatString {
context: "argument".into(),
reason: e.to_string(),
})?;
args.push(s);
}
}
}
Ok(args)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn script_runner_state_display() {
assert_eq!(ScriptRunnerState::Ready.to_string(), "Ready");
assert_eq!(ScriptRunnerState::Running.to_string(), "Running");
assert_eq!(ScriptRunnerState::Canceling.to_string(), "Canceling");
assert_eq!(ScriptRunnerState::Canceled.to_string(), "Canceled");
assert_eq!(ScriptRunnerState::Timeout.to_string(), "Timeout");
assert_eq!(ScriptRunnerState::Failed.to_string(), "Failed");
assert_eq!(ScriptRunnerState::Success.to_string(), "Success");
}
#[test]
fn cancel_method_display() {
assert_eq!(CancelMethod::Terminate.to_string(), "Terminate");
assert_eq!(
CancelMethod::NotifyThenTerminate {
terminate_delay: Duration::from_secs(30)
}
.to_string(),
"NotifyThenTerminate(30s)"
);
}
}