use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use openjd_expr::function_library::FunctionLibrary;
use openjd_expr::path_mapping::PathMappingRule;
use openjd_model::job::{Environment, StepScript};
use openjd_model::symbol_table::SymbolTable;
use openjd_model::types::JobParameterValues;
use tokio_util::sync::CancellationToken;
use crate::action::{ActionMessage, ActionResult, ActionState};
use crate::action_status::ActionStatus;
use crate::cross_user_helper::run_via_helper;
pub const DEFAULT_CANCEL_NOTIFY_PERIOD_SECS: u64 = 5;
#[cfg(unix)]
use crate::cross_user_helper::CrossUserHelper;
#[cfg(windows)]
use crate::cross_user_helper::CrossUserHelperWin;
use crate::error::SessionError;
use crate::logging::{log_section_banner, LogContent};
use crate::runner::env_script::EnvironmentScriptRunner;
use crate::runner::step_script::StepScriptRunner;
use crate::session_log;
use crate::session_user::SessionUser;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SessionState {
Ready,
Running,
Canceling,
ReadyEnding,
Ended,
}
impl std::fmt::Display for SessionState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SessionState::Ready => write!(f, "READY"),
SessionState::Running => write!(f, "RUNNING"),
SessionState::Canceling => write!(f, "CANCELING"),
SessionState::ReadyEnding => write!(f, "READY_ENDING"),
SessionState::Ended => write!(f, "ENDED"),
}
}
}
pub type EnvironmentIdentifier = String;
pub type SessionCallbackType = Box<dyn Fn(&str, &ActionStatus) + Send + Sync>;
pub struct SessionConfig {
pub session_id: String,
pub job_parameter_values: JobParameterValues,
pub path_mapping_rules: Option<Vec<PathMappingRule>>,
pub retain_working_dir: bool,
pub callback: Option<SessionCallbackType>,
pub os_env_vars: Option<HashMap<String, String>>,
pub session_root_directory: Option<PathBuf>,
pub user: Option<Arc<dyn SessionUser>>,
pub profile: Option<openjd_model::ModelProfile>,
pub cancel_token: Option<CancellationToken>,
pub sticky_bit_policy: crate::tempdir::StickyBitPolicy,
pub debug_collect_stdout: bool,
pub echo_openjd_directives: bool,
}
fn format_exit_code(code: Option<i32>) -> String {
match code {
Some(c) => format!("exit code: {c}"),
None => "exit code: N/A".to_string(),
}
}
fn normalize_env_key(name: &str) -> String {
#[cfg(windows)]
{
name.to_uppercase()
}
#[cfg(not(windows))]
{
name.to_string()
}
}
type EnvVarChanges = HashMap<String, Option<String>>;
struct ActionStatusFields {
state: Option<ActionState>,
progress: Option<f64>,
status_message: Option<String>,
fail_message: Option<String>,
exit_code: Option<i32>,
started_at: Option<std::time::SystemTime>,
ended_at: Option<std::time::SystemTime>,
}
impl ActionStatusFields {
fn new() -> Self {
Self {
state: None,
progress: None,
status_message: None,
fail_message: None,
exit_code: None,
started_at: None,
ended_at: None,
}
}
fn reset(&mut self) {
self.state = Some(ActionState::Running);
self.started_at = Some(std::time::SystemTime::now());
self.ended_at = None;
self.progress = None;
self.status_message = None;
self.fail_message = None;
self.exit_code = None;
}
}
struct CancelFields {
token: Option<CancellationToken>,
request_tx: Option<tokio::sync::watch::Sender<Option<Duration>>>,
mark_failed: bool,
parent_token: Option<CancellationToken>,
}
impl CancelFields {
fn new(parent_token: Option<CancellationToken>) -> Self {
Self {
token: None,
request_tx: None,
mark_failed: false,
parent_token,
}
}
}
struct CrossUserFields {
user: Option<Arc<dyn SessionUser>>,
helpers_dir: Option<PathBuf>,
#[cfg(unix)]
helper: Option<CrossUserHelper>,
#[cfg(windows)]
helper: Option<CrossUserHelperWin>,
cancel_writer: Option<std::fs::File>,
helper_auth_token: Option<String>,
}
fn derive_library(
profile: Option<&openjd_model::ModelProfile>,
rules: &Arc<Vec<PathMappingRule>>,
) -> Arc<FunctionLibrary> {
let host = openjd_expr::HostContext::WithRules(rules.clone());
let expr_profile = match profile {
Some(p) => p.to_expr_profile(host),
None => openjd_expr::ExprProfile::current().with_host_context(host),
};
openjd_expr::FunctionLibrary::for_profile(&expr_profile)
}
pub struct Session {
session_id: String,
state: SessionState,
ending_only: bool,
working_directory: PathBuf,
files_directory: PathBuf,
retain_working_dir: bool,
cleanup_called: bool,
_working_dir: Option<crate::tempdir::TempDir>,
_files_dir: Option<crate::tempdir::TempDir>,
environments: HashMap<EnvironmentIdentifier, Environment>,
environments_entered: Vec<EnvironmentIdentifier>,
env_vars: HashMap<String, String>,
process_env: HashMap<String, String>,
created_env_vars: HashMap<EnvironmentIdentifier, EnvVarChanges>,
library: Arc<FunctionLibrary>,
path_mapping_rules: Arc<Vec<PathMappingRule>>,
job_parameter_values: JobParameterValues,
action: ActionStatusFields,
cancel: CancelFields,
cross_user: CrossUserFields,
callback: Option<SessionCallbackType>,
redacted_values: HashSet<String>,
profile: Option<openjd_model::ModelProfile>,
debug_collect_stdout: bool,
echo_openjd_directives: bool,
}
impl Session {
#[cfg(any(test, feature = "test-utils"))]
pub fn new_for_test(working_directory: PathBuf) -> Self {
let files_directory = working_directory.join("embedded_files");
Self {
session_id: String::new(),
state: SessionState::Ready,
ending_only: false,
working_directory,
files_directory,
retain_working_dir: false,
cleanup_called: false,
_working_dir: None,
_files_dir: None,
environments: HashMap::new(),
environments_entered: Vec::new(),
env_vars: HashMap::new(),
process_env: HashMap::new(),
created_env_vars: HashMap::new(),
library: derive_library(None, &Arc::new(Vec::new())),
path_mapping_rules: Arc::new(Vec::new()),
job_parameter_values: HashMap::new(),
action: ActionStatusFields::new(),
cancel: CancelFields::new(None),
cross_user: CrossUserFields {
user: None,
helpers_dir: None,
helper: None,
cancel_writer: None,
helper_auth_token: None,
},
callback: None,
redacted_values: HashSet::new(),
profile: None,
debug_collect_stdout: true, echo_openjd_directives: true, }
}
#[cfg(any(test, feature = "test-utils"))]
pub fn set_cancel_writer_for_test(&mut self, writer: std::fs::File) {
self.cross_user.cancel_writer = Some(writer);
}
#[cfg(any(test, feature = "test-utils"))]
pub fn set_helper_auth_token_for_test(&mut self, token: String) {
self.cross_user.helper_auth_token = Some(token);
}
#[cfg(any(test, feature = "test-utils"))]
pub fn set_state_for_test(&mut self, state: SessionState) {
self.state = state;
}
pub fn with_config(mut config: SessionConfig) -> Result<Self, SessionError> {
let root_dir = match &config.session_root_directory {
Some(d) => d.clone(),
None => crate::tempdir::openjd_temp_dir(None)?,
};
#[cfg(unix)]
{
use crate::tempdir::StickyBitPolicy;
match config.sticky_bit_policy {
StickyBitPolicy::Strict => {
if let Some(path) = crate::tempdir::find_missing_sticky_bit(&root_dir) {
return Err(SessionError::PathPermissions {
path: path.display().to_string(),
reason: format!(
"Directory is world-writable without the sticky bit set. \
This allows other users to modify or delete session files. \
Set the sticky bit (chmod +t {}) or use \
StickyBitPolicy::Warn to override.",
path.display()
),
});
}
}
StickyBitPolicy::Warn => {
if let Some(path) = crate::tempdir::find_missing_sticky_bit(&root_dir) {
log::warn!(
target: "openjd.sessions",
"Sticky bit is not set on {}. This may pose a risk when running \
work on this host as users may modify or delete files in this \
directory which do not belong to them.",
path.display()
);
}
}
StickyBitPolicy::Disabled => {}
}
}
let working_dir = crate::tempdir::TempDir::new(
Some(&root_dir),
Some(&config.session_id),
config.user.as_deref(),
)?;
let files_dir = crate::tempdir::TempDir::new(
Some(working_dir.path()),
Some("embedded_files"),
config.user.as_deref(),
)?;
let working_directory = working_dir.path().to_path_buf();
let files_directory = files_dir.path().to_path_buf();
let mut path_mapping_rules = config.path_mapping_rules.unwrap_or_default();
path_mapping_rules.sort_by_key(|r| std::cmp::Reverse(r.source_path.len()));
let path_mapping_rules = Arc::new(path_mapping_rules);
let profile = config.profile.take();
let library = derive_library(profile.as_ref(), &path_mapping_rules);
let process_env = config.os_env_vars.unwrap_or_default();
let mut helpers_dir = None;
#[cfg(unix)]
let (helper, cancel_writer, helper_auth_token) = if let Some(ref user) = config.user {
if !user.is_process_user() {
let hdir = crate::helper_binary::create_helpers_dir(
&working_directory,
Some(user.as_ref()),
)?;
let helper_path = crate::helper_binary::write_helper(&hdir, user.as_ref())?;
let (h, cw) = CrossUserHelper::spawn(&helper_path, user.as_ref())?;
let token = h.auth_token().to_string();
helpers_dir = Some(hdir);
(Some(h), Some(cw), Some(token))
} else {
(None, None, None)
}
} else {
(None, None, None)
};
#[cfg(windows)]
let (helper, cancel_writer, helper_auth_token) = if let Some(ref user) = config.user {
if !user.is_process_user() {
let hdir = crate::helper_binary::create_helpers_dir(
&working_directory,
Some(user.as_ref()),
)?;
let helper_path = crate::helper_binary::write_helper(&hdir, user.as_ref())?;
let (h, cw) = CrossUserHelperWin::spawn(&helper_path, user.as_ref())?;
let token = h.auth_token().to_string();
helpers_dir = Some(hdir);
(Some(h), Some(cw), Some(token))
} else {
(None, None, None)
}
} else {
(None, None, None)
};
session_log!(
info,
&config.session_id,
LogContent::HOST_INFO,
"openjd-sessions Library Version: {}",
env!("CARGO_PKG_VERSION")
);
session_log!(
info,
&config.session_id,
LogContent::HOST_INFO,
"Platform: {}",
std::env::consts::OS
);
session_log!(
info,
&config.session_id,
LogContent::HOST_INFO,
"Architecture: {}",
std::env::consts::ARCH
);
log::info!(target: "openjd.sessions", session_id = config.session_id.as_str(); "Initializing Open Job Description Session: {}", &config.session_id);
session_log!(
info,
&config.session_id,
LogContent::FILE_PATH,
"Session Working Directory: {}",
working_directory.display()
);
session_log!(
info,
&config.session_id,
LogContent::FILE_PATH,
"Session's Embedded Files Directory: {}",
files_directory.display()
);
Ok(Self {
session_id: config.session_id,
state: SessionState::Ready,
ending_only: false,
working_directory,
files_directory,
retain_working_dir: config.retain_working_dir,
cleanup_called: false,
_working_dir: Some(working_dir),
_files_dir: Some(files_dir),
environments: HashMap::new(),
environments_entered: Vec::new(),
env_vars: HashMap::new(),
process_env,
created_env_vars: HashMap::new(),
library,
path_mapping_rules,
job_parameter_values: config.job_parameter_values,
action: ActionStatusFields::new(),
cancel: CancelFields::new(config.cancel_token),
cross_user: CrossUserFields {
user: config.user,
helpers_dir,
helper,
cancel_writer,
helper_auth_token,
},
callback: config.callback,
redacted_values: HashSet::new(),
profile,
debug_collect_stdout: config.debug_collect_stdout,
echo_openjd_directives: config.echo_openjd_directives,
})
}
pub fn with_path_mapping(mut self, mut rules: Vec<PathMappingRule>) -> Self {
rules.sort_by_key(|r| std::cmp::Reverse(r.source_path.len()));
self.path_mapping_rules = Arc::new(rules);
self.library = derive_library(self.profile.as_ref(), &self.path_mapping_rules);
self
}
pub fn extend_path_mapping_rules(&mut self, additional: Vec<PathMappingRule>) {
let mut rules = (*self.path_mapping_rules).clone();
rules.extend(additional);
rules.sort_by_key(|r| std::cmp::Reverse(r.source_path.len()));
self.path_mapping_rules = Arc::new(rules);
self.library = derive_library(self.profile.as_ref(), &self.path_mapping_rules);
}
pub fn path_mapping_rules(&self) -> &[PathMappingRule] {
&self.path_mapping_rules
}
pub fn with_profile(mut self, profile: openjd_model::ModelProfile) -> Self {
self.profile = Some(profile);
self.library = derive_library(self.profile.as_ref(), &self.path_mapping_rules);
self
}
fn redactions_enabled(&self) -> bool {
match &self.profile {
Some(p) => {
p.revision() > openjd_model::types::SpecificationRevision::V2023_09
|| p.has_extension(openjd_model::types::ModelExtension::RedactedEnvVars)
}
None => false,
}
}
fn lib(&self) -> Option<&FunctionLibrary> {
Some(&self.library)
}
fn notify_callback(&self) {
if let Some(cb) = &self.callback {
if let Some(status) = self.action_status() {
cb(&self.session_id, &status);
}
}
}
pub fn get_enabled_extensions(&self) -> Vec<String> {
match &self.profile {
Some(p) => {
let mut exts: Vec<String> = p
.extensions()
.iter()
.map(|e| e.as_str().to_string())
.collect();
exts.sort();
exts
}
None => Vec::new(),
}
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn state(&self) -> SessionState {
self.state
}
pub fn working_directory(&self) -> &Path {
&self.working_directory
}
pub fn files_directory(&self) -> &Path {
&self.files_directory
}
pub fn environments_entered(&self) -> &[EnvironmentIdentifier] {
&self.environments_entered
}
pub fn clone_cancel_writer(&self) -> Option<std::fs::File> {
self.cross_user
.cancel_writer
.as_ref()
.and_then(|f| f.try_clone().ok())
}
pub fn action_status(&self) -> Option<ActionStatus> {
self.action.state.map(|state| ActionStatus {
state,
progress: self.action.progress,
status_message: self.action.status_message.clone(),
fail_message: self.action.fail_message.clone(),
exit_code: self.action.exit_code,
started_at: self.action.started_at,
ended_at: self.action.ended_at,
})
}
pub fn override_action_state(&mut self, state: ActionState) {
self.action.state = Some(state);
}
pub fn redact(&self, text: &str) -> String {
let mut vals: Vec<&str> = self.redacted_values.iter().map(|s| s.as_str()).collect();
vals.sort_by_key(|s| std::cmp::Reverse(s.len()));
let mut result = text.to_string();
for val in vals {
result = result.replace(val, "********");
}
result
}
fn new_action_cancel_token(&self) -> CancellationToken {
match &self.cancel.parent_token {
Some(parent) => parent.child_token(),
None => CancellationToken::new(),
}
}
pub fn cancel_action(
&mut self,
time_limit: Option<Duration>,
mark_action_failed: bool,
) -> Result<(), SessionError> {
if self.state != SessionState::Running {
return Err(SessionError::InvalidState {
expected: vec![SessionState::Running],
current: self.state,
});
}
self.state = SessionState::Canceling;
if mark_action_failed {
self.cancel.mark_failed = true;
}
if let Some(ref mut writer) = self.cross_user.cancel_writer {
use std::io::Write;
let is_terminate = matches!(time_limit, Some(d) if d.is_zero());
let token_field = match &self.cross_user.helper_auth_token {
Some(t) => format!(r#""token":"{t}","#),
None => String::new(),
};
let cmd = if is_terminate {
format!(r#"{{{token_field}"cancel":"TERMINATE"}}"#)
} else {
let notify_period = time_limit
.unwrap_or(Duration::from_secs(DEFAULT_CANCEL_NOTIFY_PERIOD_SECS))
.as_secs();
format!(
r#"{{{token_field}"cancel":"NOTIFY_THEN_TERMINATE","notifyPeriodInSeconds":{notify_period}}}"#
)
};
let _ = writer.write_all(cmd.as_bytes());
let _ = writer.write_all(b"\n");
let _ = writer.flush();
}
if let Some(tx) = &self.cancel.request_tx {
let _ = tx.send(time_limit);
}
if let Some(token) = &self.cancel.token {
token.cancel();
}
Ok(())
}
pub fn cleanup(&mut self) {
if self.cleanup_called {
return;
}
self.cleanup_called = true;
if !self.retain_working_dir {
log_section_banner(&self.session_id, "Session Cleanup");
if let Some(ref mut helper) = self.cross_user.helper {
helper.shutdown();
}
session_log!(
info,
&self.session_id,
LogContent::FILE_PATH,
"Deleting working directory: {}",
self.working_directory.display()
);
#[cfg(unix)]
if let Some(ref user) = self.cross_user.user {
if !user.is_process_user() {
if let Ok(entries) = std::fs::read_dir(&self.working_directory) {
let files: Vec<String> = entries
.filter_map(|e| e.ok())
.map(|e| e.path().to_string_lossy().to_string())
.collect();
if !files.is_empty() {
let mut args = vec![
"-u".to_string(),
user.user().to_string(),
"-i".to_string(),
"rm".to_string(),
"-rf".to_string(),
"--".to_string(),
];
args.extend(files);
let _ = std::process::Command::new("sudo")
.args(&args)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status();
}
}
}
}
if let Some(ref mut files_dir) = self._files_dir {
let _ = files_dir.cleanup();
}
if let Some(ref mut working_dir) = self._working_dir {
let _ = working_dir.cleanup();
} else {
let _ = std::fs::remove_dir_all(&self.working_directory);
}
}
self.state = SessionState::Ended;
}
pub async fn enter_environment(
&mut self,
env: &Environment,
resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
identifier: Option<&str>,
os_env_vars: Option<&HashMap<String, String>>,
) -> Result<String, SessionError> {
let (id, _stdout) = self
.enter_environment_with_output(env, resolved_symtab, identifier, os_env_vars)
.await?;
Ok(id)
}
pub async fn enter_environment_with_output(
&mut self,
env: &Environment,
resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
identifier: Option<&str>,
os_env_vars: Option<&HashMap<String, String>>,
) -> Result<(String, String), SessionError> {
if self.state != SessionState::Ready {
return Err(SessionError::InvalidState {
expected: vec![SessionState::Ready],
current: self.state,
});
}
let symtab = self.build_symbol_table(None, resolved_symtab)?;
let identifier = match identifier {
Some(id) => {
if self.environments.contains_key(id) {
return Err(SessionError::DuplicateEnvironment { id: id.to_string() });
}
id.to_string()
}
None => format!("{}:{}", self.session_id, uuid::Uuid::new_v4().simple()),
};
self.environments.insert(identifier.clone(), env.clone());
self.environments_entered.push(identifier.clone());
self.created_env_vars
.insert(identifier.clone(), HashMap::new());
if let Some(vars) = &env.variables {
for (key, fmt_str) in vars {
let value = fmt_str
.resolve_string_with(
&symtab,
&openjd_expr::FormatStringOptions::new().with_library(self.lib()),
)
.map_err(|e| SessionError::FormatString {
context: format!("env var '{key}'"),
reason: e.to_string(),
})?;
let norm_key = normalize_env_key(key);
self.env_vars.insert(norm_key.clone(), value.clone());
if let Some(changes) = self.created_env_vars.get_mut(&identifier) {
changes.insert(norm_key, Some(value));
}
}
}
let output = if env
.script
.as_ref()
.and_then(|s| s.actions.on_enter.as_ref())
.is_some()
{
self.action.reset();
self.state = SessionState::Running;
self.notify_callback();
log_section_banner(
&self.session_id,
&format!("Entering Environment: {}", env.name),
);
let cancel_token = self.new_action_cancel_token();
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
self.cancel.token = Some(cancel_token.clone());
self.cancel.request_tx = Some(cancel_tx);
let env_vars = self.evaluate_env_vars(os_env_vars);
let mut action_symtab = symtab.clone();
self.materialize_path_mapping(&mut action_symtab)?;
let inner_on_enter = env
.script
.as_ref()
.and_then(|s| s.actions.on_enter.as_ref())
.expect("outer branch guard");
let wrap_action = self.wrap_env_excluding(&identifier).and_then(|outer| {
outer
.script
.as_ref()
.and_then(|s| s.actions.on_wrap_env_enter.as_ref())
.cloned()
.map(|action| (outer.resolved_symtab.clone(), action))
});
let lib = self.library.clone();
if let Some((wrap_symtab, _)) = wrap_action.as_ref() {
seed_wrapped_action_symbols(
&mut action_symtab,
wrap_symtab,
inner_on_enter,
WrappedContext::Env(&env.name),
&self.env_vars,
Some(&lib),
"onEnter",
)?;
}
let action_symtab = Box::new(action_symtab);
let env_vars = Box::new(env_vars);
#[allow(unused_mut)]
let mut runner = EnvironmentScriptRunner::new(
&self.session_id,
self.working_directory.clone(),
self.files_directory.clone(),
self.cross_user.user.clone(),
)
.with_redactions(self.redactions_enabled())
.with_debug_collect_stdout(self.debug_collect_stdout)
.with_echo_openjd_directives(self.echo_openjd_directives)
.with_initial_redacted_values(self.redacted_values.iter().cloned().collect())
.with_cancel_token(cancel_token)
.with_cancel_request_rx(cancel_rx);
if let Some(ref hdir) = self.cross_user.helpers_dir {
runner = runner.with_helpers_directory(hdir.clone());
}
let mut runner = match self.cross_user.helper.take() {
Some(h) => {
let r = runner.with_helper(h);
match self
.cross_user
.cancel_writer
.as_ref()
.and_then(|f| f.try_clone().ok())
{
Some(w) => r.with_cancel_writer(w),
None => r,
}
}
None => runner,
};
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let runner_fut: std::pin::Pin<Box<dyn std::future::Future<Output = _>>> =
match wrap_action.as_ref() {
Some((_, action)) => Box::pin(runner.run_wrap_action(
action,
&action_symtab,
Some(&lib),
&env_vars,
tx,
None,
)),
None => Box::pin(runner.enter(env, &action_symtab, Some(&lib), &env_vars, tx)),
};
let result = self.drive_action(runner_fut, &mut rx, &identifier).await;
self.cross_user.helper = runner.take_helper();
let result = result?;
if result.state != ActionState::Success {
return Err(SessionError::EnvironmentScriptFailed {
name: env.name.clone(),
action: "onEnter".into(),
reason: format_exit_code(result.exit_code),
});
}
result.stdout.clone()
} else {
let now = std::time::SystemTime::now();
self.action.state = Some(ActionState::Success);
self.action.started_at = Some(now);
self.action.progress = None;
self.action.status_message = None;
self.action.fail_message = None;
self.action.exit_code = None;
log_section_banner(
&self.session_id,
&format!("Entering Environment: {}", env.name),
);
self.action.ended_at = Some(std::time::SystemTime::now());
if let Some(cb) = &self.callback {
if let Some(status) = self.action_status() {
cb(&self.session_id, &status);
}
}
String::new()
};
if self.state == SessionState::Running {
self.state = SessionState::Ready;
}
Ok((identifier, output))
}
pub async fn exit_environment(
&mut self,
identifier: &EnvironmentIdentifier,
resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
keep_session_running: bool,
os_env_vars: Option<&HashMap<String, String>>,
) -> Result<String, SessionError> {
if self.state != SessionState::Ready && self.state != SessionState::ReadyEnding {
return Err(SessionError::InvalidState {
expected: vec![SessionState::Ready, SessionState::ReadyEnding],
current: self.state,
});
}
let env = self
.environments
.get(identifier)
.ok_or_else(|| SessionError::UnknownEnvironment {
identifier: identifier.to_string(),
})?
.clone();
if self.environments_entered.last() != Some(identifier) {
return Err(SessionError::LifoViolation {
expected: self
.environments_entered
.last()
.cloned()
.unwrap_or_default(),
got: identifier.clone(),
});
}
let symtab = self.build_symbol_table(None, resolved_symtab)?;
let env_vars = Box::new(self.evaluate_env_vars(os_env_vars));
if !keep_session_running {
self.ending_only = true;
}
self.environments.remove(identifier);
self.environments_entered.pop();
let output = if env
.script
.as_ref()
.and_then(|s| s.actions.on_exit.as_ref())
.is_some()
{
self.action.reset();
self.state = SessionState::Running;
self.notify_callback();
log_section_banner(
&self.session_id,
&format!("Exiting Environment: {}", env.name),
);
let cancel_token = self.new_action_cancel_token();
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
self.cancel.token = Some(cancel_token.clone());
self.cancel.request_tx = Some(cancel_tx);
let mut action_symtab = symtab.clone();
self.materialize_path_mapping(&mut action_symtab)?;
let inner_on_exit = env
.script
.as_ref()
.and_then(|s| s.actions.on_exit.as_ref())
.expect("outer branch guard");
let wrap_action = self.active_wrap_env().and_then(|outer| {
outer
.script
.as_ref()
.and_then(|s| s.actions.on_wrap_env_exit.as_ref())
.cloned()
.map(|action| (outer.resolved_symtab.clone(), action))
});
let lib = self.library.clone();
if let Some((wrap_symtab, _)) = wrap_action.as_ref() {
seed_wrapped_action_symbols(
&mut action_symtab,
wrap_symtab,
inner_on_exit,
WrappedContext::Env(&env.name),
&self.env_vars,
Some(&lib),
"onExit",
)?;
}
let action_symtab = Box::new(action_symtab);
#[allow(unused_mut)]
let mut runner = EnvironmentScriptRunner::new(
&self.session_id,
self.working_directory.clone(),
self.files_directory.clone(),
self.cross_user.user.clone(),
)
.with_redactions(self.redactions_enabled())
.with_debug_collect_stdout(self.debug_collect_stdout)
.with_echo_openjd_directives(self.echo_openjd_directives)
.with_initial_redacted_values(self.redacted_values.iter().cloned().collect())
.with_cancel_token(cancel_token)
.with_cancel_request_rx(cancel_rx);
if let Some(ref hdir) = self.cross_user.helpers_dir {
runner = runner.with_helpers_directory(hdir.clone());
}
let mut runner = match self.cross_user.helper.take() {
Some(h) => {
let r = runner.with_helper(h);
match self
.cross_user
.cancel_writer
.as_ref()
.and_then(|f| f.try_clone().ok())
{
Some(w) => r.with_cancel_writer(w),
None => r,
}
}
None => runner,
};
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let runner_fut: std::pin::Pin<Box<dyn std::future::Future<Output = _>>> =
match wrap_action.as_ref() {
Some((_, action)) => Box::pin(runner.run_wrap_action(
action,
&action_symtab,
Some(&lib),
&env_vars,
tx,
None,
)),
None => Box::pin(runner.exit(&env, &action_symtab, Some(&lib), &env_vars, tx)),
};
let result = self.drive_action(runner_fut, &mut rx, identifier).await;
self.cross_user.helper = runner.take_helper();
let result = result?;
if result.state != ActionState::Success {
self.state = SessionState::ReadyEnding;
return Err(SessionError::EnvironmentScriptFailed {
name: env.name.clone(),
action: "onExit".into(),
reason: format_exit_code(result.exit_code),
});
}
result.stdout.clone()
} else {
let now = std::time::SystemTime::now();
self.action.state = Some(ActionState::Success);
self.action.started_at = Some(now);
self.action.progress = None;
self.action.status_message = None;
self.action.fail_message = None;
self.action.exit_code = None;
self.state = if self.ending_only {
SessionState::ReadyEnding
} else {
SessionState::Ready
};
log_section_banner(
&self.session_id,
&format!("Exiting Environment: {}", env.name),
);
self.action.ended_at = Some(std::time::SystemTime::now());
if let Some(cb) = &self.callback {
if let Some(status) = self.action_status() {
cb(&self.session_id, &status);
}
}
String::new()
};
Ok(output)
}
pub async fn run_task(
&mut self,
step_name: &str,
script: &StepScript,
task_parameter_values: Option<&openjd_model::types::TaskParameterSet>,
resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
os_env_vars: Option<&HashMap<String, String>>,
) -> Result<ActionResult, SessionError> {
if self.state != SessionState::Ready {
return Err(SessionError::InvalidState {
expected: vec![SessionState::Ready],
current: self.state,
});
}
let symtab = self.build_symbol_table(task_parameter_values, resolved_symtab)?;
self.action.reset();
self.state = SessionState::Running;
self.notify_callback();
log_section_banner(&self.session_id, "Running Task");
let cancel_token = self.new_action_cancel_token();
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
self.cancel.token = Some(cancel_token.clone());
self.cancel.request_tx = Some(cancel_tx);
let env_vars = self.evaluate_env_vars(os_env_vars);
let mut action_symtab = symtab.clone();
self.materialize_path_mapping(&mut action_symtab)?;
let lib = self.library.clone();
let wrap_action: Option<openjd_model::job::Action> = self
.active_wrap_env()
.and_then(|wrap_env| {
wrap_env
.script
.as_ref()
.and_then(|s| s.actions.on_wrap_task_run.clone())
.map(|action| (wrap_env.resolved_symtab.clone(), action))
})
.map(|(wrap_symtab, action)| {
seed_wrapped_action_symbols(
&mut action_symtab,
&wrap_symtab,
&script.actions.on_run,
WrappedContext::Step(step_name),
&self.env_vars,
Some(&lib),
"task",
)?;
Ok::<_, SessionError>(action)
})
.transpose()?;
let action_symtab = Box::new(action_symtab);
let env_vars = Box::new(env_vars);
#[allow(unused_mut)]
let mut runner = StepScriptRunner::new(
&self.session_id,
self.working_directory.clone(),
self.files_directory.clone(),
self.cross_user.user.clone(),
)
.with_redactions(self.redactions_enabled())
.with_debug_collect_stdout(self.debug_collect_stdout)
.with_echo_openjd_directives(self.echo_openjd_directives)
.with_initial_redacted_values(self.redacted_values.iter().cloned().collect())
.with_cancel_token(cancel_token)
.with_cancel_request_rx(cancel_rx);
if let Some(ref hdir) = self.cross_user.helpers_dir {
runner = runner.with_helpers_directory(hdir.clone());
}
let mut runner = match self.cross_user.helper.take() {
Some(h) => {
let r = runner.with_helper(h);
match self
.cross_user
.cancel_writer
.as_ref()
.and_then(|f| f.try_clone().ok())
{
Some(w) => r.with_cancel_writer(w),
None => r,
}
}
None => runner,
};
let step_identifier = format!("{}:step:{}", self.session_id, uuid::Uuid::new_v4().simple());
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let effective_script: std::borrow::Cow<'_, StepScript> = match wrap_action {
Some(action) => std::borrow::Cow::Owned(StepScript {
let_bindings: script.let_bindings.clone(),
actions: openjd_model::job::StepActions { on_run: action },
embedded_files: script.embedded_files.clone(),
}),
None => std::borrow::Cow::Borrowed(script),
};
let runner_fut = Box::pin(runner.run(
effective_script.as_ref(),
&action_symtab,
Some(&lib),
&env_vars,
tx,
));
let result = self
.drive_action(runner_fut, &mut rx, &step_identifier)
.await;
self.cross_user.helper = runner.take_helper();
let result = result?;
Ok(ActionResult {
state: result.state,
exit_code: result.exit_code,
stdout: result.stdout,
})
}
pub async fn run_subprocess(
&mut self,
command: &str,
args: Option<&[String]>,
timeout: Option<Duration>,
os_env_vars: Option<&HashMap<String, String>>,
use_session_env_vars: bool,
log_banner_message: Option<&str>,
) -> Result<crate::subprocess::SubprocessResult, SessionError> {
if self.state != SessionState::Ready {
return Err(SessionError::InvalidState {
expected: vec![SessionState::Ready],
current: self.state,
});
}
if command.is_empty() || command.trim().is_empty() {
return Err(SessionError::Runtime(
"command must be a non-empty string".into(),
));
}
if let Some(t) = timeout {
if t.is_zero() {
return Err(SessionError::Runtime("timeout must be positive".into()));
}
}
if let Some(msg) = log_banner_message {
log_section_banner(&self.session_id, msg);
}
self.action.reset();
self.state = SessionState::Running;
self.notify_callback();
let env_vars = if use_session_env_vars {
self.evaluate_env_vars(os_env_vars)
} else {
let mut result: HashMap<String, Option<String>> = self
.process_env
.iter()
.map(|(k, v)| (k.clone(), Some(v.clone())))
.collect();
if let Some(extra) = os_env_vars {
for (k, v) in extra {
result.insert(k.clone(), Some(v.clone()));
}
}
result
};
let mut cmd_args = vec![command.to_string()];
if let Some(a) = args {
cmd_args.extend(a.iter().cloned());
}
if self.cross_user.helper.is_some() {
return self
.run_subprocess_via_helper(&cmd_args, env_vars, timeout)
.await;
}
let cancel_token = self.new_action_cancel_token();
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
self.cancel.token = Some(cancel_token.clone());
self.cancel.request_tx = Some(cancel_tx);
let config = crate::subprocess::SubprocessConfig {
args: cmd_args,
env_vars,
working_dir: Some(self.working_directory.clone()),
timeout,
user: self.cross_user.user.clone(),
cancel_method: crate::runner::CancelMethod::Terminate,
cancel_request_rx: Some(cancel_rx),
debug_collect_stdout: self.debug_collect_stdout,
};
let mut filter = crate::action_filter::ActionFilter::new(
&self.session_id,
self.echo_openjd_directives,
false,
);
let subprocess_identifier = format!(
"{}:subprocess:{}",
self.session_id,
uuid::Uuid::new_v4().simple()
);
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let sid = self.session_id.clone();
let runner_fut = Box::pin(crate::subprocess::run_subprocess(
config,
&mut filter,
&sid,
tx,
cancel_token,
));
self.drive_action(runner_fut, &mut rx, &subprocess_identifier)
.await
}
async fn run_subprocess_via_helper(
&mut self,
args: &[String],
env_vars: std::collections::HashMap<String, Option<String>>,
_timeout: Option<Duration>,
) -> Result<crate::subprocess::SubprocessResult, SessionError> {
let mut filter = crate::action_filter::ActionFilter::new(
&self.session_id,
self.echo_openjd_directives,
false,
);
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let subprocess_identifier = format!(
"{}:subprocess:{}",
self.session_id,
uuid::Uuid::new_v4().simple()
);
let config = crate::subprocess::SubprocessConfig {
args: args.to_vec(),
env_vars,
working_dir: Some(self.working_directory.clone()),
timeout: _timeout,
user: self.cross_user.user.clone(),
cancel_method: crate::runner::CancelMethod::Terminate,
cancel_request_rx: None,
debug_collect_stdout: self.debug_collect_stdout,
};
let helper = self
.cross_user
.helper
.as_mut()
.expect("caller checked helper.is_some()");
let result = run_via_helper(
helper,
&config,
&mut filter,
&self.session_id,
tx,
self.cross_user.cancel_writer.as_ref(),
)
.await;
while let Ok(msg) = rx.try_recv() {
self.apply_message(msg, &subprocess_identifier);
}
let r = result?;
self.action.state = Some(r.state);
self.action.ended_at = Some(std::time::SystemTime::now());
self.action.exit_code = r.exit_code;
self.cancel.token = None;
self.cancel.request_tx = None;
self.cancel.mark_failed = false;
self.state = if r.state == ActionState::Success {
SessionState::Ready
} else {
SessionState::ReadyEnding
};
self.notify_callback();
Ok(r)
}
async fn drive_action<F>(
&mut self,
action_fut: F,
rx: &mut tokio::sync::mpsc::UnboundedReceiver<ActionMessage>,
identifier: &str,
) -> Result<crate::subprocess::SubprocessResult, SessionError>
where
F: std::future::Future<Output = Result<crate::subprocess::SubprocessResult, SessionError>>,
{
tokio::pin!(action_fut);
let mut result = None;
loop {
tokio::select! {
biased;
msg = rx.recv(), if result.is_none() => {
if let Some(msg) = msg { self.apply_message(msg, identifier) }
}
r = &mut action_fut, if result.is_none() => {
result = Some(r);
}
else => break,
}
if result.is_some() {
while let Ok(msg) = rx.try_recv() {
self.apply_message(msg, identifier);
}
break;
}
}
let r = match result.expect("loop guarantees result is Some") {
Ok(r) => r,
Err(e) => {
self.action.state = Some(ActionState::Failed);
self.action.ended_at = Some(std::time::SystemTime::now());
self.action.exit_code = None;
self.cancel.token = None;
self.cancel.request_tx = None;
self.cancel.mark_failed = false;
self.state = SessionState::ReadyEnding;
if let Some(cb) = &self.callback {
if let Some(status) = self.action_status() {
cb(&self.session_id, &status);
}
}
return Err(e);
}
};
let final_state = if self.cancel.mark_failed && r.state == ActionState::Canceled {
ActionState::Failed
} else {
r.state
};
self.action.state = Some(final_state);
self.action.ended_at = Some(std::time::SystemTime::now());
self.action.exit_code = r.exit_code;
self.cancel.token = None;
self.cancel.request_tx = None;
self.cancel.mark_failed = false;
self.state = if self.ending_only || final_state != ActionState::Success {
SessionState::ReadyEnding
} else {
SessionState::Ready
};
if let Some(cb) = &self.callback {
if let Some(status) = self.action_status() {
cb(&self.session_id, &status);
}
}
Ok(crate::subprocess::SubprocessResult {
state: final_state,
exit_code: r.exit_code,
stdout: r.stdout,
})
}
fn apply_message(&mut self, msg: ActionMessage, identifier: &str) {
match msg {
ActionMessage::Progress(v) => {
self.action.progress = Some(v);
}
ActionMessage::Status(s) => {
self.action.status_message = Some(s);
}
ActionMessage::Fail(s) => {
self.action.fail_message = Some(s);
}
ActionMessage::SetEnv { name, value } => {
let key = normalize_env_key(&name);
self.env_vars.insert(key.clone(), value.clone());
if let Some(changes) = self.created_env_vars.get_mut(identifier) {
changes.insert(key, Some(value));
}
}
ActionMessage::UnsetEnv { name } => {
let key = normalize_env_key(&name);
self.env_vars.remove(&key);
if let Some(changes) = self.created_env_vars.get_mut(identifier) {
changes.insert(key, None);
}
}
ActionMessage::RedactedEnv { name, value } => {
if self.redactions_enabled() {
let key = normalize_env_key(&name);
self.env_vars.insert(key.clone(), value.clone());
if let Some(changes) = self.created_env_vars.get_mut(identifier) {
changes.insert(key, Some(value.clone()));
}
}
self.redacted_values.insert(value);
}
ActionMessage::CancelMarkFailed { fail_message } => {
self.action.fail_message = Some(fail_message);
let _ = self.cancel_action(None, true);
}
}
if let Some(cb) = &self.callback {
if let Some(status) = self.action_status() {
cb(&self.session_id, &status);
}
}
}
fn materialize_path_mapping(&self, symtab: &mut SymbolTable) -> Result<(), SessionError> {
let has_rules = !self.path_mapping_rules.is_empty();
let rules_json = if has_rules {
let rules: Vec<serde_json::Value> = self
.path_mapping_rules
.iter()
.map(|rule| {
serde_json::json!({
"source_path_format": match rule.source_path_format {
openjd_expr::path_mapping::PathFormat::Posix => "POSIX",
openjd_expr::path_mapping::PathFormat::Windows => "WINDOWS",
openjd_expr::path_mapping::PathFormat::Uri => "URI",
},
"source_path": &rule.source_path,
"destination_path": &rule.destination_path,
})
})
.collect();
serde_json::json!({"version": "pathmapping-1.0", "path_mapping_rules": rules})
.to_string()
} else {
serde_json::json!({}).to_string()
};
symtab
.set(
"Session.HasPathMappingRules",
openjd_expr::ExprValue::Bool(has_rules),
)
.map_err(|e| {
SessionError::Runtime(format!("Failed to set HasPathMappingRules: {e}"))
})?;
let filename = self.working_directory.join(format!(
"pathmapping_{}.json",
uuid::Uuid::new_v4().simple()
));
std::fs::write(&filename, &rules_json).map_err(|e| SessionError::WorkingDirectory {
path: filename.clone(),
source: e,
})?;
symtab
.set(
"Session.PathMappingRulesFile",
openjd_expr::ExprValue::new_path(
filename.to_string_lossy().to_string(),
openjd_expr::path_mapping::PathFormat::host(),
),
)
.map_err(|e| {
SessionError::Runtime(format!("Failed to set PathMappingRulesFile: {e}"))
})?;
Ok(())
}
pub fn evaluate_env_vars(
&self,
extra: Option<&HashMap<String, String>>,
) -> HashMap<String, Option<String>> {
let mut result: HashMap<String, Option<String>> = self
.process_env
.iter()
.map(|(k, v)| (normalize_env_key(k), Some(v.clone())))
.collect();
result.insert(
"OPENJD_SESSION_WORKING_DIR".to_string(),
Some(self.working_directory.to_string_lossy().into_owned()),
);
if let Some(extra) = extra {
for (k, v) in extra {
result.insert(normalize_env_key(k), Some(v.clone()));
}
}
for id in &self.environments_entered {
if let Some(changes) = self.created_env_vars.get(id) {
for (name, value) in changes {
result.insert(name.clone(), value.clone());
}
}
}
result
}
pub fn job_parameter_values(&self) -> &JobParameterValues {
&self.job_parameter_values
}
pub fn build_symbol_table(
&self,
task_parameter_values: Option<&openjd_model::types::TaskParameterSet>,
base: Option<&openjd_expr::SerializedSymbolTable>,
) -> Result<SymbolTable, SessionError> {
use openjd_model::types::TaskParameterType;
let mut symtab = if let Some(base) = base {
let mut s = base
.to_symtab(openjd_expr::path_mapping::PathFormat::host())
.map_err(|e| {
SessionError::Runtime(format!("Failed to deserialize resolved_symtab: {e}"))
})?;
for (name, param) in &self.job_parameter_values {
use openjd_model::types::JobParameterType;
match param.param_type {
JobParameterType::Path => {
let raw = match ¶m.value {
openjd_expr::ExprValue::String(s) => s.as_str(),
openjd_expr::ExprValue::Path { value, .. } => value.as_str(),
_ => continue,
};
let mapped = self.apply_path_mapping_to_string(raw);
let key = format!("Param.{name}");
s.set(
&key,
openjd_expr::ExprValue::new_path(
mapped,
openjd_expr::path_mapping::PathFormat::host(),
),
)
.map_err(|e| SessionError::Runtime(format!("Failed to set {key}: {e}")))?;
}
JobParameterType::ListPath => {
if let openjd_expr::ExprValue::ListString(ref elements, _) = param.value {
let mapped: Vec<openjd_expr::ExprValue> = elements
.iter()
.map(|s| {
let m = self.apply_path_mapping_to_string(s);
openjd_expr::ExprValue::new_path(
m,
openjd_expr::path_mapping::PathFormat::host(),
)
})
.collect();
let key = format!("Param.{name}");
s.set(
&key,
openjd_expr::ExprValue::make_list(
mapped,
openjd_expr::ExprType::PATH,
)
.unwrap(),
)
.map_err(|e| {
SessionError::Runtime(format!("Failed to set {key}: {e}"))
})?;
}
}
_ => {}
}
}
s
} else {
let mut s = SymbolTable::new();
for (name, param) in &self.job_parameter_values {
let raw_key = format!("RawParam.{name}");
s.set(&raw_key, param.value.clone())
.map_err(|e| SessionError::Runtime(format!("Failed to set {raw_key}: {e}")))?;
let key = format!("Param.{name}");
use openjd_model::types::JobParameterType;
let mapped_value = match param.param_type {
JobParameterType::Path => {
let raw = match ¶m.value {
openjd_expr::ExprValue::String(s) => s.as_str(),
openjd_expr::ExprValue::Path { value, .. } => value.as_str(),
_ => "",
};
let mapped = self.apply_path_mapping_to_string(raw);
openjd_expr::ExprValue::new_path(
mapped,
openjd_expr::path_mapping::PathFormat::host(),
)
}
JobParameterType::ListPath => match ¶m.value {
openjd_expr::ExprValue::ListString(elements, _) => {
let mapped: Vec<openjd_expr::ExprValue> = elements
.iter()
.map(|s| {
let m = self.apply_path_mapping_to_string(s);
openjd_expr::ExprValue::new_path(
m,
openjd_expr::path_mapping::PathFormat::host(),
)
})
.collect();
openjd_expr::ExprValue::make_list(mapped, openjd_expr::ExprType::PATH)
.unwrap()
}
other => self.apply_path_mapping_to_value(other),
},
_ => self.apply_path_mapping_to_value(¶m.value),
};
s.set(&key, mapped_value)
.map_err(|e| SessionError::Runtime(format!("Failed to set {key}: {e}")))?;
}
s
};
let host_path_format = openjd_expr::path_mapping::PathFormat::host();
symtab
.set(
"Session.WorkingDirectory",
openjd_expr::ExprValue::new_path(
self.working_directory.to_string_lossy().to_string(),
host_path_format,
),
)
.map_err(|e| SessionError::Runtime(format!("Failed to set WorkingDirectory: {e}")))?;
if let Some(task_params) = task_parameter_values {
for (name, tv) in task_params {
let raw_key = format!("Task.RawParam.{name}");
symtab
.set(&raw_key, tv.value.clone())
.map_err(|e| SessionError::Runtime(format!("Failed to set {raw_key}: {e}")))?;
let key = format!("Task.Param.{name}");
let param_value = match tv.param_type {
TaskParameterType::Path => {
let s = match &tv.value {
openjd_expr::ExprValue::String(s) => s.as_str(),
openjd_expr::ExprValue::Path { value, .. } => value.as_str(),
_ => "",
};
let mapped = self.apply_path_mapping_to_string(s);
openjd_expr::ExprValue::new_path(
mapped,
openjd_expr::path_mapping::PathFormat::host(),
)
}
_ => tv.value.clone(),
};
symtab
.set(&key, param_value)
.map_err(|e| SessionError::Runtime(format!("Failed to set {key}: {e}")))?;
}
}
Ok(symtab)
}
fn apply_path_mapping_to_string(&self, path: &str) -> String {
for rule in self.path_mapping_rules.iter() {
if let Some(mapped) = rule.apply(path) {
return mapped;
}
}
path.to_string()
}
fn apply_path_mapping_to_value(
&self,
value: &openjd_expr::ExprValue,
) -> openjd_expr::ExprValue {
match value {
openjd_expr::ExprValue::Path {
value: path_str,
format,
..
} => {
for rule in self.path_mapping_rules.iter() {
if let Some(mapped) = rule.apply(path_str) {
return openjd_expr::ExprValue::new_path(mapped, *format);
}
}
value.clone()
}
openjd_expr::ExprValue::ListPath(elements, fmt, _) => {
let mapped: Vec<openjd_expr::ExprValue> = elements
.iter()
.map(|s| {
let mapped_s =
openjd_expr::path_mapping::apply_rules(&self.path_mapping_rules, s);
openjd_expr::ExprValue::new_path(mapped_s, *fmt)
})
.collect();
openjd_expr::ExprValue::make_list(mapped, openjd_expr::ExprType::PATH).unwrap()
}
_ => value.clone(),
}
}
fn active_wrap_env(&self) -> Option<&Environment> {
for id in self.environments_entered.iter().rev() {
if let Some(env) = self.environments.get(id) {
if env_has_any_wrap_hook(env) {
return Some(env);
}
}
}
None
}
fn wrap_env_excluding(&self, self_id: &str) -> Option<&Environment> {
for id in self.environments_entered.iter().rev() {
if id == self_id {
continue;
}
if let Some(env) = self.environments.get(id) {
if env_has_any_wrap_hook(env) {
return Some(env);
}
}
}
None
}
}
fn env_has_any_wrap_hook(env: &Environment) -> bool {
env.script
.as_ref()
.map(|s| s.actions.has_any_wrap_hook())
.unwrap_or(false)
}
pub(crate) enum WrappedContext<'a> {
Env(&'a str),
Step(&'a str),
}
fn seed_wrapped_action_symbols(
action_symtab: &mut SymbolTable,
wrap_resolved: &Option<openjd_expr::SerializedSymbolTable>,
wrapped_action: &openjd_model::job::Action,
context: WrappedContext<'_>,
session_env_vars: &HashMap<String, String>,
lib: Option<&FunctionLibrary>,
phase: &str,
) -> Result<(), SessionError> {
if let Some(ser) = wrap_resolved.as_ref() {
match ser.to_symtab(openjd_expr::path_mapping::PathFormat::host()) {
Ok(st) => action_symtab.merge_from(&st),
Err(e) => {
log::warn!(
target: "openjd.sessions",
"wrap env resolved_symtab deserialize failed: {e}; \
WrappedAction.* continues without it"
);
}
}
}
let resolved_cmd = crate::runner::resolve_action_args(wrapped_action, action_symtab, lib)
.map_err(|e| SessionError::FormatString {
context: format!("wrapped {phase} command"),
reason: e.to_string(),
})?;
let (cmd, args) = match resolved_cmd.split_first() {
Some((head, tail)) => (head.clone(), tail.to_vec()),
None => (String::new(), Vec::new()),
};
let wrapped_env: Vec<String> = session_env_vars
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect();
let wrapped_timeout_secs =
crate::runner::resolve_action_timeout(wrapped_action, action_symtab, lib, None)
.map_err(|e| SessionError::FormatString {
context: format!("wrapped {phase} timeout"),
reason: e.to_string(),
})?
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
overlay_wrapped_action_symbols(
action_symtab,
Some(context),
&cmd,
&args,
&wrapped_env,
wrapped_timeout_secs,
)
}
fn overlay_wrapped_action_symbols(
symtab: &mut SymbolTable,
wrapped: Option<WrappedContext<'_>>,
wrapped_command: &str,
wrapped_args: &[String],
wrapped_environment: &[String],
wrapped_timeout_secs: i64,
) -> Result<(), SessionError> {
match wrapped {
Some(WrappedContext::Env(name)) => {
set_string_symbol(symtab, "WrappedEnv.Name", name)?;
}
Some(WrappedContext::Step(name)) => {
set_string_symbol(symtab, "WrappedStep.Name", name)?;
}
None => {}
}
set_string_symbol(symtab, "WrappedAction.Command", wrapped_command)?;
set_string_list_symbol(symtab, "WrappedAction.Args", wrapped_args)?;
set_string_list_symbol(symtab, "WrappedAction.Environment", wrapped_environment)?;
set_int_symbol(symtab, "WrappedAction.Timeout", wrapped_timeout_secs)?;
Ok(())
}
fn set_string_symbol(
symtab: &mut SymbolTable,
name: &str,
value: &str,
) -> Result<(), SessionError> {
symtab
.set(name, openjd_expr::ExprValue::String(value.into()))
.map_err(|e| SessionError::Runtime(format!("Failed to set {name}: {e}")))
}
fn set_string_list_symbol(
symtab: &mut SymbolTable,
name: &str,
values: &[String],
) -> Result<(), SessionError> {
let list: Vec<openjd_expr::ExprValue> = values
.iter()
.map(|s| openjd_expr::ExprValue::String(s.clone()))
.collect();
let value = openjd_expr::ExprValue::make_list(list, openjd_expr::ExprType::STRING)
.map_err(|e| SessionError::Runtime(format!("make_list({name}): {e}")))?;
symtab
.set(name, value)
.map_err(|e| SessionError::Runtime(format!("Failed to set {name}: {e}")))
}
fn set_int_symbol(symtab: &mut SymbolTable, name: &str, value: i64) -> Result<(), SessionError> {
symtab
.set(name, openjd_expr::ExprValue::Int(value))
.map_err(|e| SessionError::Runtime(format!("Failed to set {name}: {e}")))
}
impl Drop for Session {
fn drop(&mut self) {
if !self.cleanup_called {
log::warn!(
target: "openjd.sessions",
"Session '{}' was dropped without calling cleanup(). \
Working directory may not have been cleaned up.",
self.session_id
);
if !self.retain_working_dir {
let _ = std::fs::remove_dir_all(&self.working_directory);
}
}
}
}
#[cfg(test)]
mod wrap_actions_tests {
use super::*;
use openjd_expr::ExprValue;
use openjd_model::format_string::FormatString;
use openjd_model::job::{Action, EnvironmentActions, EnvironmentScript};
fn fs(s: &str) -> FormatString {
FormatString::new(s).unwrap()
}
fn echo() -> Action {
Action {
command: fs("echo"),
args: None,
timeout: None,
cancelation: None,
}
}
fn env_with_actions(name: &str, actions: EnvironmentActions) -> Environment {
Environment {
name: name.to_string(),
description: None,
script: Some(EnvironmentScript {
let_bindings: None,
actions,
embedded_files: None,
}),
variables: None,
resolved_symtab: None,
}
}
fn empty_actions() -> EnvironmentActions {
EnvironmentActions {
on_enter: None,
on_wrap_env_enter: None,
on_wrap_task_run: None,
on_wrap_env_exit: None,
on_exit: None,
}
}
#[test]
fn env_has_any_wrap_hook_returns_false_for_plain_env() {
let env = env_with_actions(
"Plain",
EnvironmentActions {
on_enter: Some(echo()),
on_exit: Some(echo()),
..empty_actions()
},
);
assert!(!env_has_any_wrap_hook(&env));
}
#[test]
fn env_has_any_wrap_hook_returns_true_for_each_hook() {
for actions in [
EnvironmentActions {
on_wrap_env_enter: Some(echo()),
..empty_actions()
},
EnvironmentActions {
on_wrap_task_run: Some(echo()),
..empty_actions()
},
EnvironmentActions {
on_wrap_env_exit: Some(echo()),
..empty_actions()
},
] {
let env = env_with_actions("Wrap", actions);
assert!(env_has_any_wrap_hook(&env));
}
}
#[test]
fn env_has_any_wrap_hook_returns_false_when_script_missing() {
let env = Environment {
name: "NoScript".into(),
description: None,
script: None,
variables: None,
resolved_symtab: None,
};
assert!(!env_has_any_wrap_hook(&env));
}
#[test]
fn overlay_sets_wrapped_action_symbols_for_task_hook() {
let mut symtab = SymbolTable::default();
overlay_wrapped_action_symbols(
&mut symtab,
Some(WrappedContext::Step("MyStep")),
"echo",
&["a".into(), "b c".into()],
&["FOO=bar".into()],
42,
)
.unwrap();
assert_eq!(
symtab.get_value("WrappedAction.Command"),
Some(&ExprValue::String("echo".into()))
);
assert_eq!(
symtab.get_value("WrappedAction.Timeout"),
Some(&ExprValue::Int(42))
);
assert_eq!(
symtab.get_value("WrappedStep.Name"),
Some(&ExprValue::String("MyStep".into()))
);
assert!(symtab.get_value("WrappedEnv.Name").is_none());
}
#[test]
fn overlay_sets_wrapped_env_name_when_provided() {
let mut symtab = SymbolTable::default();
overlay_wrapped_action_symbols(
&mut symtab,
Some(WrappedContext::Env("InnerEnv")),
"true",
&[],
&[],
0,
)
.unwrap();
assert_eq!(
symtab.get_value("WrappedEnv.Name"),
Some(&ExprValue::String("InnerEnv".into()))
);
assert!(symtab.get_value("WrappedStep.Name").is_none());
}
#[test]
fn overlay_handles_empty_args_and_environment() {
let mut symtab = SymbolTable::default();
overlay_wrapped_action_symbols(&mut symtab, None, "true", &[], &[], 0).unwrap();
assert!(symtab.get_value("WrappedAction.Args").is_some());
assert!(symtab.get_value("WrappedAction.Environment").is_some());
assert_eq!(
symtab.get_value("WrappedAction.Timeout"),
Some(&ExprValue::Int(0))
);
}
}