1use std::collections::{HashMap, HashSet};
29use std::path::{Path, PathBuf};
30use std::sync::Arc;
31use std::time::Duration;
32
33use openjd_expr::function_library::FunctionLibrary;
34use openjd_expr::path_mapping::PathMappingRule;
35use openjd_model::job::{Environment, StepScript};
36use openjd_model::symbol_table::SymbolTable;
37use openjd_model::types::JobParameterValues;
38use tokio_util::sync::CancellationToken;
39
40use crate::action::{ActionMessage, ActionResult, ActionState};
41use crate::action_status::ActionStatus;
42use crate::cross_user_helper::run_via_helper;
43
44pub const DEFAULT_CANCEL_NOTIFY_PERIOD_SECS: u64 = 5;
47#[cfg(unix)]
48use crate::cross_user_helper::CrossUserHelper;
49#[cfg(windows)]
50use crate::cross_user_helper::CrossUserHelperWin;
51use crate::error::SessionError;
52use crate::logging::{log_section_banner, LogContent};
53use crate::runner::env_script::EnvironmentScriptRunner;
54use crate::runner::step_script::StepScriptRunner;
55use crate::session_log;
56use crate::session_user::SessionUser;
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum SessionState {
68 Ready,
69 Running,
70 Canceling,
71 ReadyEnding,
72 Ended,
73}
74
75impl std::fmt::Display for SessionState {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 match self {
78 SessionState::Ready => write!(f, "READY"),
79 SessionState::Running => write!(f, "RUNNING"),
80 SessionState::Canceling => write!(f, "CANCELING"),
81 SessionState::ReadyEnding => write!(f, "READY_ENDING"),
82 SessionState::Ended => write!(f, "ENDED"),
83 }
84 }
85}
86
87pub type EnvironmentIdentifier = String;
89
90pub type SessionCallbackType = Box<dyn Fn(&str, &ActionStatus) + Send + Sync>;
92
93pub struct SessionConfig {
95 pub session_id: String,
96 pub job_parameter_values: JobParameterValues,
97 pub path_mapping_rules: Option<Vec<PathMappingRule>>,
98 pub retain_working_dir: bool,
99 pub callback: Option<SessionCallbackType>,
100 pub os_env_vars: Option<HashMap<String, String>>,
101 pub session_root_directory: Option<PathBuf>,
102 pub user: Option<Arc<dyn SessionUser>>,
103 pub profile: Option<openjd_model::ModelProfile>,
109 pub cancel_token: Option<CancellationToken>,
112 pub sticky_bit_policy: crate::tempdir::StickyBitPolicy,
116 pub debug_collect_stdout: bool,
122 pub echo_openjd_directives: bool,
141}
142
143fn format_exit_code(code: Option<i32>) -> String {
144 match code {
145 Some(c) => format!("exit code: {c}"),
146 None => "exit code: N/A".to_string(),
147 }
148}
149
150fn normalize_env_key(name: &str) -> String {
154 #[cfg(windows)]
155 {
156 name.to_uppercase()
157 }
158 #[cfg(not(windows))]
159 {
160 name.to_string()
161 }
162}
163
164type EnvVarChanges = HashMap<String, Option<String>>;
169
170struct ActionStatusFields {
172 state: Option<ActionState>,
173 progress: Option<f64>,
174 status_message: Option<String>,
175 fail_message: Option<String>,
176 exit_code: Option<i32>,
177 started_at: Option<std::time::SystemTime>,
178 ended_at: Option<std::time::SystemTime>,
179}
180
181impl ActionStatusFields {
182 fn new() -> Self {
183 Self {
184 state: None,
185 progress: None,
186 status_message: None,
187 fail_message: None,
188 exit_code: None,
189 started_at: None,
190 ended_at: None,
191 }
192 }
193
194 fn reset(&mut self) {
196 self.state = Some(ActionState::Running);
197 self.started_at = Some(std::time::SystemTime::now());
198 self.ended_at = None;
199 self.progress = None;
200 self.status_message = None;
201 self.fail_message = None;
202 self.exit_code = None;
203 }
204}
205
206struct CancelFields {
208 token: Option<CancellationToken>,
210 request_tx: Option<tokio::sync::watch::Sender<Option<Duration>>>,
212 mark_failed: bool,
214 parent_token: Option<CancellationToken>,
216}
217
218impl CancelFields {
219 fn new(parent_token: Option<CancellationToken>) -> Self {
220 Self {
221 token: None,
222 request_tx: None,
223 mark_failed: false,
224 parent_token,
225 }
226 }
227}
228
229struct CrossUserFields {
231 user: Option<Arc<dyn SessionUser>>,
232 helpers_dir: Option<PathBuf>,
234 #[cfg(unix)]
235 helper: Option<CrossUserHelper>,
236 #[cfg(windows)]
237 helper: Option<CrossUserHelperWin>,
238 cancel_writer: Option<std::fs::File>,
239 helper_auth_token: Option<String>,
244}
245
246fn derive_library(
254 profile: Option<&openjd_model::ModelProfile>,
255 rules: &Arc<Vec<PathMappingRule>>,
256) -> Arc<FunctionLibrary> {
257 let host = openjd_expr::HostContext::WithRules(rules.clone());
258 let expr_profile = match profile {
259 Some(p) => p.to_expr_profile(host),
260 None => openjd_expr::ExprProfile::current().with_host_context(host),
261 };
262 openjd_expr::FunctionLibrary::for_profile(&expr_profile)
263}
264
265pub struct Session {
266 session_id: String,
267 state: SessionState,
268 ending_only: bool,
269 working_directory: PathBuf,
270 files_directory: PathBuf,
271 retain_working_dir: bool,
272 cleanup_called: bool,
273 _working_dir: Option<crate::tempdir::TempDir>,
275 _files_dir: Option<crate::tempdir::TempDir>,
276 environments: HashMap<EnvironmentIdentifier, Environment>,
278 environments_entered: Vec<EnvironmentIdentifier>,
279 env_vars: HashMap<String, String>,
281 process_env: HashMap<String, String>,
282 created_env_vars: HashMap<EnvironmentIdentifier, EnvVarChanges>,
283 library: Arc<FunctionLibrary>,
291 path_mapping_rules: Arc<Vec<PathMappingRule>>,
292 job_parameter_values: JobParameterValues,
293 action: ActionStatusFields,
295 cancel: CancelFields,
296 cross_user: CrossUserFields,
297 callback: Option<SessionCallbackType>,
299 redacted_values: HashSet<String>,
301 profile: Option<openjd_model::ModelProfile>,
302 debug_collect_stdout: bool,
303 echo_openjd_directives: bool,
306}
307
308impl Session {
309 #[cfg(any(test, feature = "test-utils"))]
312 pub fn new_for_test(working_directory: PathBuf) -> Self {
313 let files_directory = working_directory.join("embedded_files");
314 Self {
315 session_id: String::new(),
316 state: SessionState::Ready,
317 ending_only: false,
318 working_directory,
319 files_directory,
320 retain_working_dir: false,
321 cleanup_called: false,
322 _working_dir: None,
323 _files_dir: None,
324 environments: HashMap::new(),
325 environments_entered: Vec::new(),
326 env_vars: HashMap::new(),
327 process_env: HashMap::new(),
328 created_env_vars: HashMap::new(),
329 library: derive_library(None, &Arc::new(Vec::new())),
330 path_mapping_rules: Arc::new(Vec::new()),
331 job_parameter_values: HashMap::new(),
332 action: ActionStatusFields::new(),
333 cancel: CancelFields::new(None),
334 cross_user: CrossUserFields {
335 user: None,
336 helpers_dir: None,
337 helper: None,
338 cancel_writer: None,
339 helper_auth_token: None,
340 },
341 callback: None,
342 redacted_values: HashSet::new(),
343 profile: None,
344 debug_collect_stdout: true, echo_openjd_directives: true, }
347 }
348
349 #[cfg(any(test, feature = "test-utils"))]
352 pub fn set_cancel_writer_for_test(&mut self, writer: std::fs::File) {
353 self.cross_user.cancel_writer = Some(writer);
354 }
355
356 #[cfg(any(test, feature = "test-utils"))]
360 pub fn set_helper_auth_token_for_test(&mut self, token: String) {
361 self.cross_user.helper_auth_token = Some(token);
362 }
363
364 #[cfg(any(test, feature = "test-utils"))]
367 pub fn set_state_for_test(&mut self, state: SessionState) {
368 self.state = state;
369 }
370
371 pub fn with_config(mut config: SessionConfig) -> Result<Self, SessionError> {
373 let root_dir = match &config.session_root_directory {
374 Some(d) => d.clone(),
375 None => crate::tempdir::openjd_temp_dir(None)?,
376 };
377
378 #[cfg(unix)]
379 {
380 use crate::tempdir::StickyBitPolicy;
381 match config.sticky_bit_policy {
382 StickyBitPolicy::Strict => {
383 if let Some(path) = crate::tempdir::find_missing_sticky_bit(&root_dir) {
384 return Err(SessionError::PathPermissions {
385 path: path.display().to_string(),
386 reason: format!(
387 "Directory is world-writable without the sticky bit set. \
388 This allows other users to modify or delete session files. \
389 Set the sticky bit (chmod +t {}) or use \
390 StickyBitPolicy::Warn to override.",
391 path.display()
392 ),
393 });
394 }
395 }
396 StickyBitPolicy::Warn => {
397 if let Some(path) = crate::tempdir::find_missing_sticky_bit(&root_dir) {
398 log::warn!(
399 target: "openjd.sessions",
400 "Sticky bit is not set on {}. This may pose a risk when running \
401 work on this host as users may modify or delete files in this \
402 directory which do not belong to them.",
403 path.display()
404 );
405 }
406 }
407 StickyBitPolicy::Disabled => {}
408 }
409 }
410
411 let working_dir = crate::tempdir::TempDir::new(
412 Some(&root_dir),
413 Some(&config.session_id),
414 config.user.as_deref(),
415 )?;
416 let files_dir = crate::tempdir::TempDir::new(
417 Some(working_dir.path()),
418 Some("embedded_files"),
419 config.user.as_deref(),
420 )?;
421
422 let working_directory = working_dir.path().to_path_buf();
423 let files_directory = files_dir.path().to_path_buf();
424
425 let mut path_mapping_rules = config.path_mapping_rules.unwrap_or_default();
426 path_mapping_rules.sort_by_key(|r| std::cmp::Reverse(r.source_path.len()));
427 let path_mapping_rules = Arc::new(path_mapping_rules);
428 let profile = config.profile.take();
429 let library = derive_library(profile.as_ref(), &path_mapping_rules);
430 let process_env = config.os_env_vars.unwrap_or_default();
431
432 let mut helpers_dir = None;
436
437 #[cfg(unix)]
438 let (helper, cancel_writer, helper_auth_token) = if let Some(ref user) = config.user {
439 if !user.is_process_user() {
440 let hdir = crate::helper_binary::create_helpers_dir(
441 &working_directory,
442 Some(user.as_ref()),
443 )?;
444 let helper_path = crate::helper_binary::write_helper(&hdir, user.as_ref())?;
445 let (h, cw) = CrossUserHelper::spawn(&helper_path, user.as_ref())?;
446 let token = h.auth_token().to_string();
447 helpers_dir = Some(hdir);
448 (Some(h), Some(cw), Some(token))
449 } else {
450 (None, None, None)
451 }
452 } else {
453 (None, None, None)
454 };
455
456 #[cfg(windows)]
457 let (helper, cancel_writer, helper_auth_token) = if let Some(ref user) = config.user {
458 if !user.is_process_user() {
459 let hdir = crate::helper_binary::create_helpers_dir(
460 &working_directory,
461 Some(user.as_ref()),
462 )?;
463 let helper_path = crate::helper_binary::write_helper(&hdir, user.as_ref())?;
464 let (h, cw) = CrossUserHelperWin::spawn(&helper_path, user.as_ref())?;
465 let token = h.auth_token().to_string();
466 helpers_dir = Some(hdir);
467 (Some(h), Some(cw), Some(token))
468 } else {
469 (None, None, None)
470 }
471 } else {
472 (None, None, None)
473 };
474
475 session_log!(
477 info,
478 &config.session_id,
479 LogContent::HOST_INFO,
480 "openjd-sessions Library Version: {}",
481 env!("CARGO_PKG_VERSION")
482 );
483 session_log!(
484 info,
485 &config.session_id,
486 LogContent::HOST_INFO,
487 "Platform: {}",
488 std::env::consts::OS
489 );
490 session_log!(
491 info,
492 &config.session_id,
493 LogContent::HOST_INFO,
494 "Architecture: {}",
495 std::env::consts::ARCH
496 );
497 log::info!(target: "openjd.sessions", session_id = config.session_id.as_str(); "Initializing Open Job Description Session: {}", &config.session_id);
502 session_log!(
503 info,
504 &config.session_id,
505 LogContent::FILE_PATH,
506 "Session Working Directory: {}",
507 working_directory.display()
508 );
509 session_log!(
510 info,
511 &config.session_id,
512 LogContent::FILE_PATH,
513 "Session's Embedded Files Directory: {}",
514 files_directory.display()
515 );
516
517 Ok(Self {
518 session_id: config.session_id,
519 state: SessionState::Ready,
520 ending_only: false,
521 working_directory,
522 files_directory,
523 retain_working_dir: config.retain_working_dir,
524 cleanup_called: false,
525 _working_dir: Some(working_dir),
526 _files_dir: Some(files_dir),
527 environments: HashMap::new(),
528 environments_entered: Vec::new(),
529 env_vars: HashMap::new(),
530 process_env,
531 created_env_vars: HashMap::new(),
532 library,
533 path_mapping_rules,
534 job_parameter_values: config.job_parameter_values,
535 action: ActionStatusFields::new(),
536 cancel: CancelFields::new(config.cancel_token),
537 cross_user: CrossUserFields {
538 user: config.user,
539 helpers_dir,
540 helper,
541 cancel_writer,
542 helper_auth_token,
543 },
544 callback: config.callback,
545 redacted_values: HashSet::new(),
546 profile,
547 debug_collect_stdout: config.debug_collect_stdout,
548 echo_openjd_directives: config.echo_openjd_directives,
549 })
550 }
551
552 pub fn with_path_mapping(mut self, mut rules: Vec<PathMappingRule>) -> Self {
553 rules.sort_by_key(|r| std::cmp::Reverse(r.source_path.len()));
554 self.path_mapping_rules = Arc::new(rules);
555 self.library = derive_library(self.profile.as_ref(), &self.path_mapping_rules);
556 self
557 }
558
559 pub fn extend_path_mapping_rules(&mut self, additional: Vec<PathMappingRule>) {
562 let mut rules = (*self.path_mapping_rules).clone();
563 rules.extend(additional);
564 rules.sort_by_key(|r| std::cmp::Reverse(r.source_path.len()));
565 self.path_mapping_rules = Arc::new(rules);
566 self.library = derive_library(self.profile.as_ref(), &self.path_mapping_rules);
567 }
568
569 pub fn path_mapping_rules(&self) -> &[PathMappingRule] {
571 &self.path_mapping_rules
572 }
573
574 pub fn with_profile(mut self, profile: openjd_model::ModelProfile) -> Self {
580 self.profile = Some(profile);
581 self.library = derive_library(self.profile.as_ref(), &self.path_mapping_rules);
582 self
583 }
584
585 fn redactions_enabled(&self) -> bool {
588 match &self.profile {
589 Some(p) => {
590 p.revision() > openjd_model::types::SpecificationRevision::V2023_09
591 || p.has_extension(openjd_model::types::ModelExtension::RedactedEnvVars)
592 }
593 None => false,
594 }
595 }
596
597 fn lib(&self) -> Option<&FunctionLibrary> {
598 Some(&self.library)
599 }
600
601 fn notify_callback(&self) {
604 if let Some(cb) = &self.callback {
605 if let Some(status) = self.action_status() {
606 cb(&self.session_id, &status);
607 }
608 }
609 }
610
611 pub fn get_enabled_extensions(&self) -> Vec<String> {
616 match &self.profile {
617 Some(p) => {
618 let mut exts: Vec<String> = p
619 .extensions()
620 .iter()
621 .map(|e| e.as_str().to_string())
622 .collect();
623 exts.sort();
624 exts
625 }
626 None => Vec::new(),
627 }
628 }
629
630 pub fn session_id(&self) -> &str {
631 &self.session_id
632 }
633 pub fn state(&self) -> SessionState {
634 self.state
635 }
636 pub fn working_directory(&self) -> &Path {
637 &self.working_directory
638 }
639 pub fn files_directory(&self) -> &Path {
640 &self.files_directory
641 }
642 pub fn environments_entered(&self) -> &[EnvironmentIdentifier] {
643 &self.environments_entered
644 }
645
646 pub fn clone_cancel_writer(&self) -> Option<std::fs::File> {
650 self.cross_user
651 .cancel_writer
652 .as_ref()
653 .and_then(|f| f.try_clone().ok())
654 }
655
656 pub fn action_status(&self) -> Option<ActionStatus> {
658 self.action.state.map(|state| ActionStatus {
659 state,
660 progress: self.action.progress,
661 status_message: self.action.status_message.clone(),
662 fail_message: self.action.fail_message.clone(),
663 exit_code: self.action.exit_code,
664 started_at: self.action.started_at,
665 ended_at: self.action.ended_at,
666 })
667 }
668
669 pub fn override_action_state(&mut self, state: ActionState) {
672 self.action.state = Some(state);
673 }
674
675 pub fn redact(&self, text: &str) -> String {
677 let mut vals: Vec<&str> = self.redacted_values.iter().map(|s| s.as_str()).collect();
680 vals.sort_by_key(|s| std::cmp::Reverse(s.len()));
681 let mut result = text.to_string();
682 for val in vals {
683 result = result.replace(val, "********");
684 }
685 result
686 }
687
688 fn new_action_cancel_token(&self) -> CancellationToken {
692 match &self.cancel.parent_token {
693 Some(parent) => parent.child_token(),
694 None => CancellationToken::new(),
695 }
696 }
697
698 pub fn cancel_action(
700 &mut self,
701 time_limit: Option<Duration>,
702 mark_action_failed: bool,
703 ) -> Result<(), SessionError> {
704 if self.state != SessionState::Running {
705 return Err(SessionError::InvalidState {
706 expected: vec![SessionState::Running],
707 current: self.state,
708 });
709 }
710 self.state = SessionState::Canceling;
711 if mark_action_failed {
712 self.cancel.mark_failed = true;
713 }
714
715 if let Some(ref mut writer) = self.cross_user.cancel_writer {
724 use std::io::Write;
725 let is_terminate = matches!(time_limit, Some(d) if d.is_zero());
726 let token_field = match &self.cross_user.helper_auth_token {
727 Some(t) => format!(r#""token":"{t}","#),
728 None => String::new(),
729 };
730 let cmd = if is_terminate {
731 format!(r#"{{{token_field}"cancel":"TERMINATE"}}"#)
732 } else {
733 let notify_period = time_limit
734 .unwrap_or(Duration::from_secs(DEFAULT_CANCEL_NOTIFY_PERIOD_SECS))
735 .as_secs();
736 format!(
737 r#"{{{token_field}"cancel":"NOTIFY_THEN_TERMINATE","notifyPeriodInSeconds":{notify_period}}}"#
738 )
739 };
740 let _ = writer.write_all(cmd.as_bytes());
741 let _ = writer.write_all(b"\n");
742 let _ = writer.flush();
743 }
744
745 if let Some(tx) = &self.cancel.request_tx {
746 let _ = tx.send(time_limit);
747 }
748 if let Some(token) = &self.cancel.token {
749 token.cancel();
750 }
751 Ok(())
752 }
753
754 pub fn cleanup(&mut self) {
756 if self.cleanup_called {
757 return;
758 }
759 self.cleanup_called = true;
760
761 if !self.retain_working_dir {
762 log_section_banner(&self.session_id, "Session Cleanup");
763
764 if let Some(ref mut helper) = self.cross_user.helper {
766 helper.shutdown();
767 }
768
769 session_log!(
770 info,
771 &self.session_id,
772 LogContent::FILE_PATH,
773 "Deleting working directory: {}",
774 self.working_directory.display()
775 );
776
777 #[cfg(unix)]
780 if let Some(ref user) = self.cross_user.user {
781 if !user.is_process_user() {
782 if let Ok(entries) = std::fs::read_dir(&self.working_directory) {
783 let files: Vec<String> = entries
784 .filter_map(|e| e.ok())
785 .map(|e| e.path().to_string_lossy().to_string())
786 .collect();
787 if !files.is_empty() {
788 let mut args = vec![
789 "-u".to_string(),
790 user.user().to_string(),
791 "-i".to_string(),
792 "rm".to_string(),
793 "-rf".to_string(),
794 "--".to_string(),
795 ];
796 args.extend(files);
797 let _ = std::process::Command::new("sudo")
798 .args(&args)
799 .stdin(std::process::Stdio::null())
800 .stdout(std::process::Stdio::null())
801 .stderr(std::process::Stdio::null())
802 .status();
803 }
804 }
805 }
806 }
807
808 if let Some(ref mut files_dir) = self._files_dir {
810 let _ = files_dir.cleanup();
811 }
812 if let Some(ref mut working_dir) = self._working_dir {
813 let _ = working_dir.cleanup();
814 } else {
815 let _ = std::fs::remove_dir_all(&self.working_directory);
816 }
817 }
818 self.state = SessionState::Ended;
819 }
820
821 pub async fn enter_environment(
824 &mut self,
825 env: &Environment,
826 resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
827 identifier: Option<&str>,
828 os_env_vars: Option<&HashMap<String, String>>,
829 ) -> Result<String, SessionError> {
830 let (id, _stdout) = self
831 .enter_environment_with_output(env, resolved_symtab, identifier, os_env_vars)
832 .await?;
833 Ok(id)
834 }
835
836 pub async fn enter_environment_with_output(
838 &mut self,
839 env: &Environment,
840 resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
841 identifier: Option<&str>,
842 os_env_vars: Option<&HashMap<String, String>>,
843 ) -> Result<(String, String), SessionError> {
844 if self.state != SessionState::Ready {
845 return Err(SessionError::InvalidState {
846 expected: vec![SessionState::Ready],
847 current: self.state,
848 });
849 }
850
851 let symtab = self.build_symbol_table(None, resolved_symtab)?;
852
853 let identifier = match identifier {
854 Some(id) => {
855 if self.environments.contains_key(id) {
856 return Err(SessionError::DuplicateEnvironment { id: id.to_string() });
857 }
858 id.to_string()
859 }
860 None => format!("{}:{}", self.session_id, uuid::Uuid::new_v4().simple()),
861 };
862 self.environments.insert(identifier.clone(), env.clone());
863 self.environments_entered.push(identifier.clone());
864 self.created_env_vars
865 .insert(identifier.clone(), HashMap::new());
866
867 if let Some(vars) = &env.variables {
869 for (key, fmt_str) in vars {
870 let value = fmt_str
871 .resolve_string_with(
872 &symtab,
873 &openjd_expr::FormatStringOptions::new().with_library(self.lib()),
874 )
875 .map_err(|e| SessionError::FormatString {
876 context: format!("env var '{key}'"),
877 reason: e.to_string(),
878 })?;
879 let norm_key = normalize_env_key(key);
880 self.env_vars.insert(norm_key.clone(), value.clone());
881 if let Some(changes) = self.created_env_vars.get_mut(&identifier) {
882 changes.insert(norm_key, Some(value));
883 }
884 }
885 }
886
887 let output = if env
888 .script
889 .as_ref()
890 .and_then(|s| s.actions.on_enter.as_ref())
891 .is_some()
892 {
893 self.action.reset();
894 self.state = SessionState::Running;
895 self.notify_callback();
896
897 log_section_banner(
898 &self.session_id,
899 &format!("Entering Environment: {}", env.name),
900 );
901
902 let cancel_token = self.new_action_cancel_token();
903 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
904 self.cancel.token = Some(cancel_token.clone());
905 self.cancel.request_tx = Some(cancel_tx);
906
907 let env_vars = self.evaluate_env_vars(os_env_vars);
908 let mut action_symtab = symtab.clone();
909 self.materialize_path_mapping(&mut action_symtab)?;
910 let action_symtab = Box::new(action_symtab);
912 let env_vars = Box::new(env_vars);
913 #[allow(unused_mut)]
914 let mut runner = EnvironmentScriptRunner::new(
915 &self.session_id,
916 self.working_directory.clone(),
917 self.files_directory.clone(),
918 self.cross_user.user.clone(),
919 )
920 .with_redactions(self.redactions_enabled())
921 .with_debug_collect_stdout(self.debug_collect_stdout)
922 .with_echo_openjd_directives(self.echo_openjd_directives)
923 .with_initial_redacted_values(self.redacted_values.iter().cloned().collect())
924 .with_cancel_token(cancel_token)
925 .with_cancel_request_rx(cancel_rx);
926 if let Some(ref hdir) = self.cross_user.helpers_dir {
927 runner = runner.with_helpers_directory(hdir.clone());
928 }
929 let mut runner = match self.cross_user.helper.take() {
930 Some(h) => {
931 let r = runner.with_helper(h);
932 match self
933 .cross_user
934 .cancel_writer
935 .as_ref()
936 .and_then(|f| f.try_clone().ok())
937 {
938 Some(w) => r.with_cancel_writer(w),
939 None => r,
940 }
941 }
942 None => runner,
943 };
944
945 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
946
947 let lib = self.library.clone();
948 let runner_fut = Box::pin(runner.enter(env, &action_symtab, Some(&lib), &env_vars, tx));
952 let result = self.drive_action(runner_fut, &mut rx, &identifier).await;
953 self.cross_user.helper = runner.take_helper();
954 let result = result?;
955
956 if result.state != ActionState::Success {
957 return Err(SessionError::EnvironmentScriptFailed {
958 name: env.name.clone(),
959 action: "onEnter".into(),
960 reason: format_exit_code(result.exit_code),
961 });
962 }
963 result.stdout.clone()
964 } else {
965 let now = std::time::SystemTime::now();
966 self.action.state = Some(ActionState::Success);
967 self.action.started_at = Some(now);
968 self.action.progress = None;
969 self.action.status_message = None;
970 self.action.fail_message = None;
971 self.action.exit_code = None;
972
973 log_section_banner(
974 &self.session_id,
975 &format!("Entering Environment: {}", env.name),
976 );
977
978 self.action.ended_at = Some(std::time::SystemTime::now());
979
980 if let Some(cb) = &self.callback {
981 if let Some(status) = self.action_status() {
982 cb(&self.session_id, &status);
983 }
984 }
985
986 String::new()
987 };
988
989 if self.state == SessionState::Running {
990 self.state = SessionState::Ready;
991 }
992 Ok((identifier, output))
993 }
994
995 pub async fn exit_environment(
1001 &mut self,
1002 identifier: &EnvironmentIdentifier,
1003 resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
1004 keep_session_running: bool,
1005 os_env_vars: Option<&HashMap<String, String>>,
1006 ) -> Result<String, SessionError> {
1007 if self.state != SessionState::Ready && self.state != SessionState::ReadyEnding {
1008 return Err(SessionError::InvalidState {
1009 expected: vec![SessionState::Ready, SessionState::ReadyEnding],
1010 current: self.state,
1011 });
1012 }
1013
1014 let env = self
1016 .environments
1017 .get(identifier)
1018 .ok_or_else(|| SessionError::UnknownEnvironment {
1019 identifier: identifier.to_string(),
1020 })?
1021 .clone();
1022
1023 if self.environments_entered.last() != Some(identifier) {
1025 return Err(SessionError::LifoViolation {
1026 expected: self
1027 .environments_entered
1028 .last()
1029 .cloned()
1030 .unwrap_or_default(),
1031 got: identifier.clone(),
1032 });
1033 }
1034
1035 let symtab = self.build_symbol_table(None, resolved_symtab)?;
1036
1037 let env_vars = Box::new(self.evaluate_env_vars(os_env_vars));
1040
1041 if !keep_session_running {
1044 self.ending_only = true;
1045 }
1046
1047 self.environments.remove(identifier);
1051 self.environments_entered.pop();
1052
1053 let output = if env
1054 .script
1055 .as_ref()
1056 .and_then(|s| s.actions.on_exit.as_ref())
1057 .is_some()
1058 {
1059 self.action.reset();
1060 self.state = SessionState::Running;
1061 self.notify_callback();
1062
1063 log_section_banner(
1064 &self.session_id,
1065 &format!("Exiting Environment: {}", env.name),
1066 );
1067
1068 let cancel_token = self.new_action_cancel_token();
1069 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
1070 self.cancel.token = Some(cancel_token.clone());
1071 self.cancel.request_tx = Some(cancel_tx);
1072
1073 let mut action_symtab = symtab.clone();
1074 self.materialize_path_mapping(&mut action_symtab)?;
1075 let action_symtab = Box::new(action_symtab);
1077 #[allow(unused_mut)]
1078 let mut runner = EnvironmentScriptRunner::new(
1079 &self.session_id,
1080 self.working_directory.clone(),
1081 self.files_directory.clone(),
1082 self.cross_user.user.clone(),
1083 )
1084 .with_redactions(self.redactions_enabled())
1085 .with_debug_collect_stdout(self.debug_collect_stdout)
1086 .with_echo_openjd_directives(self.echo_openjd_directives)
1087 .with_initial_redacted_values(self.redacted_values.iter().cloned().collect())
1088 .with_cancel_token(cancel_token)
1089 .with_cancel_request_rx(cancel_rx);
1090 if let Some(ref hdir) = self.cross_user.helpers_dir {
1091 runner = runner.with_helpers_directory(hdir.clone());
1092 }
1093 let mut runner = match self.cross_user.helper.take() {
1094 Some(h) => {
1095 let r = runner.with_helper(h);
1096 match self
1097 .cross_user
1098 .cancel_writer
1099 .as_ref()
1100 .and_then(|f| f.try_clone().ok())
1101 {
1102 Some(w) => r.with_cancel_writer(w),
1103 None => r,
1104 }
1105 }
1106 None => runner,
1107 };
1108
1109 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1110
1111 let lib = self.library.clone();
1112 let runner_fut = Box::pin(runner.exit(&env, &action_symtab, Some(&lib), &env_vars, tx));
1115 let result = self.drive_action(runner_fut, &mut rx, identifier).await;
1116 self.cross_user.helper = runner.take_helper();
1117 let result = result?;
1118
1119 if result.state != ActionState::Success {
1120 self.state = SessionState::ReadyEnding;
1121 return Err(SessionError::EnvironmentScriptFailed {
1122 name: env.name.clone(),
1123 action: "onExit".into(),
1124 reason: format_exit_code(result.exit_code),
1125 });
1126 }
1127 result.stdout.clone()
1128 } else {
1129 let now = std::time::SystemTime::now();
1131 self.action.state = Some(ActionState::Success);
1132 self.action.started_at = Some(now);
1133 self.action.progress = None;
1134 self.action.status_message = None;
1135 self.action.fail_message = None;
1136 self.action.exit_code = None;
1137 self.state = if self.ending_only {
1138 SessionState::ReadyEnding
1139 } else {
1140 SessionState::Ready
1141 };
1142
1143 log_section_banner(
1144 &self.session_id,
1145 &format!("Exiting Environment: {}", env.name),
1146 );
1147
1148 self.action.ended_at = Some(std::time::SystemTime::now());
1149
1150 if let Some(cb) = &self.callback {
1151 if let Some(status) = self.action_status() {
1152 cb(&self.session_id, &status);
1153 }
1154 }
1155
1156 String::new()
1157 };
1158
1159 Ok(output)
1160 }
1161
1162 pub async fn run_task(
1164 &mut self,
1165 script: &StepScript,
1166 task_parameter_values: Option<&openjd_model::types::TaskParameterSet>,
1167 resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
1168 os_env_vars: Option<&HashMap<String, String>>,
1169 ) -> Result<ActionResult, SessionError> {
1170 if self.state != SessionState::Ready {
1171 return Err(SessionError::InvalidState {
1172 expected: vec![SessionState::Ready],
1173 current: self.state,
1174 });
1175 }
1176
1177 let symtab = self.build_symbol_table(task_parameter_values, resolved_symtab)?;
1178
1179 self.action.reset();
1180 self.state = SessionState::Running;
1181 self.notify_callback();
1182
1183 log_section_banner(&self.session_id, "Running Task");
1184
1185 let cancel_token = self.new_action_cancel_token();
1186 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
1187 self.cancel.token = Some(cancel_token.clone());
1188 self.cancel.request_tx = Some(cancel_tx);
1189
1190 let env_vars = self.evaluate_env_vars(os_env_vars);
1191 let mut action_symtab = symtab.clone();
1192 self.materialize_path_mapping(&mut action_symtab)?;
1193 let action_symtab = Box::new(action_symtab);
1198 let env_vars = Box::new(env_vars);
1199 #[allow(unused_mut)]
1200 let mut runner = StepScriptRunner::new(
1201 &self.session_id,
1202 self.working_directory.clone(),
1203 self.files_directory.clone(),
1204 self.cross_user.user.clone(),
1205 )
1206 .with_redactions(self.redactions_enabled())
1207 .with_debug_collect_stdout(self.debug_collect_stdout)
1208 .with_echo_openjd_directives(self.echo_openjd_directives)
1209 .with_initial_redacted_values(self.redacted_values.iter().cloned().collect())
1210 .with_cancel_token(cancel_token)
1211 .with_cancel_request_rx(cancel_rx);
1212 if let Some(ref hdir) = self.cross_user.helpers_dir {
1213 runner = runner.with_helpers_directory(hdir.clone());
1214 }
1215 let mut runner = match self.cross_user.helper.take() {
1216 Some(h) => {
1217 let r = runner.with_helper(h);
1218 match self
1219 .cross_user
1220 .cancel_writer
1221 .as_ref()
1222 .and_then(|f| f.try_clone().ok())
1223 {
1224 Some(w) => r.with_cancel_writer(w),
1225 None => r,
1226 }
1227 }
1228 None => runner,
1229 };
1230
1231 let step_identifier = format!("{}:step:{}", self.session_id, uuid::Uuid::new_v4().simple());
1232 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1233
1234 let lib = self.library.clone();
1235 let runner_fut = Box::pin(runner.run(script, &action_symtab, Some(&lib), &env_vars, tx));
1238 let result = self
1239 .drive_action(runner_fut, &mut rx, &step_identifier)
1240 .await;
1241 self.cross_user.helper = runner.take_helper();
1242 let result = result?;
1243
1244 Ok(ActionResult {
1245 state: result.state,
1246 exit_code: result.exit_code,
1247 stdout: result.stdout,
1248 })
1249 }
1250
1251 pub async fn run_subprocess(
1257 &mut self,
1258 command: &str,
1259 args: Option<&[String]>,
1260 timeout: Option<Duration>,
1261 os_env_vars: Option<&HashMap<String, String>>,
1262 use_session_env_vars: bool,
1263 log_banner_message: Option<&str>,
1264 ) -> Result<crate::subprocess::SubprocessResult, SessionError> {
1265 if self.state != SessionState::Ready {
1266 return Err(SessionError::InvalidState {
1267 expected: vec![SessionState::Ready],
1268 current: self.state,
1269 });
1270 }
1271 if command.is_empty() || command.trim().is_empty() {
1272 return Err(SessionError::Runtime(
1273 "command must be a non-empty string".into(),
1274 ));
1275 }
1276 if let Some(t) = timeout {
1277 if t.is_zero() {
1278 return Err(SessionError::Runtime("timeout must be positive".into()));
1279 }
1280 }
1281
1282 if let Some(msg) = log_banner_message {
1283 log_section_banner(&self.session_id, msg);
1284 }
1285
1286 self.action.reset();
1287 self.state = SessionState::Running;
1288 self.notify_callback();
1289
1290 let env_vars = if use_session_env_vars {
1291 self.evaluate_env_vars(os_env_vars)
1292 } else {
1293 let mut result: HashMap<String, Option<String>> = self
1294 .process_env
1295 .iter()
1296 .map(|(k, v)| (k.clone(), Some(v.clone())))
1297 .collect();
1298 if let Some(extra) = os_env_vars {
1299 for (k, v) in extra {
1300 result.insert(k.clone(), Some(v.clone()));
1301 }
1302 }
1303 result
1304 };
1305
1306 let mut cmd_args = vec![command.to_string()];
1307 if let Some(a) = args {
1308 cmd_args.extend(a.iter().cloned());
1309 }
1310
1311 if self.cross_user.helper.is_some() {
1313 return self
1314 .run_subprocess_via_helper(&cmd_args, env_vars, timeout)
1315 .await;
1316 }
1317
1318 let cancel_token = self.new_action_cancel_token();
1319 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
1320 self.cancel.token = Some(cancel_token.clone());
1321 self.cancel.request_tx = Some(cancel_tx);
1322
1323 let config = crate::subprocess::SubprocessConfig {
1324 args: cmd_args,
1325 env_vars,
1326 working_dir: Some(self.working_directory.clone()),
1327 timeout,
1328 user: self.cross_user.user.clone(),
1329 cancel_method: crate::runner::CancelMethod::Terminate,
1330 cancel_request_rx: Some(cancel_rx),
1331 debug_collect_stdout: self.debug_collect_stdout,
1332 };
1333 let mut filter = crate::action_filter::ActionFilter::new(
1334 &self.session_id,
1335 self.echo_openjd_directives,
1336 false,
1337 );
1338 let subprocess_identifier = format!(
1339 "{}:subprocess:{}",
1340 self.session_id,
1341 uuid::Uuid::new_v4().simple()
1342 );
1343 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1344 let sid = self.session_id.clone();
1345 let runner_fut = Box::pin(crate::subprocess::run_subprocess(
1348 config,
1349 &mut filter,
1350 &sid,
1351 tx,
1352 cancel_token,
1353 ));
1354 self.drive_action(runner_fut, &mut rx, &subprocess_identifier)
1355 .await
1356 }
1357
1358 async fn run_subprocess_via_helper(
1364 &mut self,
1365 args: &[String],
1366 env_vars: std::collections::HashMap<String, Option<String>>,
1367 _timeout: Option<Duration>,
1368 ) -> Result<crate::subprocess::SubprocessResult, SessionError> {
1369 let mut filter = crate::action_filter::ActionFilter::new(
1370 &self.session_id,
1371 self.echo_openjd_directives,
1372 false,
1373 );
1374 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1375 let subprocess_identifier = format!(
1376 "{}:subprocess:{}",
1377 self.session_id,
1378 uuid::Uuid::new_v4().simple()
1379 );
1380
1381 let config = crate::subprocess::SubprocessConfig {
1382 args: args.to_vec(),
1383 env_vars,
1384 working_dir: Some(self.working_directory.clone()),
1385 timeout: _timeout,
1386 user: self.cross_user.user.clone(),
1387 cancel_method: crate::runner::CancelMethod::Terminate,
1388 cancel_request_rx: None,
1389 debug_collect_stdout: self.debug_collect_stdout,
1390 };
1391
1392 let helper = self
1393 .cross_user
1394 .helper
1395 .as_mut()
1396 .expect("caller checked helper.is_some()");
1397 let result = run_via_helper(
1398 helper,
1399 &config,
1400 &mut filter,
1401 &self.session_id,
1402 tx,
1403 self.cross_user.cancel_writer.as_ref(),
1404 )
1405 .await;
1406
1407 while let Ok(msg) = rx.try_recv() {
1409 self.apply_message(msg, &subprocess_identifier);
1410 }
1411
1412 let r = result?;
1413 self.action.state = Some(r.state);
1414 self.action.ended_at = Some(std::time::SystemTime::now());
1415 self.action.exit_code = r.exit_code;
1416 self.cancel.token = None;
1417 self.cancel.request_tx = None;
1418 self.cancel.mark_failed = false;
1419 self.state = if r.state == ActionState::Success {
1420 SessionState::Ready
1421 } else {
1422 SessionState::ReadyEnding
1423 };
1424 self.notify_callback();
1425 Ok(r)
1426 }
1427
1428 async fn drive_action<F>(
1434 &mut self,
1435 action_fut: F,
1436 rx: &mut tokio::sync::mpsc::UnboundedReceiver<ActionMessage>,
1437 identifier: &str,
1438 ) -> Result<crate::subprocess::SubprocessResult, SessionError>
1439 where
1440 F: std::future::Future<Output = Result<crate::subprocess::SubprocessResult, SessionError>>,
1441 {
1442 tokio::pin!(action_fut);
1443 let mut result = None;
1444
1445 loop {
1446 tokio::select! {
1447 biased;
1448 msg = rx.recv(), if result.is_none() => {
1449 if let Some(msg) = msg { self.apply_message(msg, identifier) }
1451 }
1452 r = &mut action_fut, if result.is_none() => {
1453 result = Some(r);
1454 }
1455 else => break,
1456 }
1457 if result.is_some() {
1458 while let Ok(msg) = rx.try_recv() {
1460 self.apply_message(msg, identifier);
1461 }
1462 break;
1463 }
1464 }
1465
1466 let r = match result.expect("loop guarantees result is Some") {
1467 Ok(r) => r,
1468 Err(e) => {
1469 self.action.state = Some(ActionState::Failed);
1472 self.action.ended_at = Some(std::time::SystemTime::now());
1473 self.action.exit_code = None;
1474 self.cancel.token = None;
1475 self.cancel.request_tx = None;
1476 self.cancel.mark_failed = false;
1477 self.state = SessionState::ReadyEnding;
1478
1479 if let Some(cb) = &self.callback {
1480 if let Some(status) = self.action_status() {
1481 cb(&self.session_id, &status);
1482 }
1483 }
1484
1485 return Err(e);
1486 }
1487 };
1488
1489 let final_state = if self.cancel.mark_failed && r.state == ActionState::Canceled {
1492 ActionState::Failed
1493 } else {
1494 r.state
1495 };
1496
1497 self.action.state = Some(final_state);
1498 self.action.ended_at = Some(std::time::SystemTime::now());
1499 self.action.exit_code = r.exit_code;
1500 self.cancel.token = None;
1501 self.cancel.request_tx = None;
1502 self.cancel.mark_failed = false;
1503
1504 self.state = if self.ending_only || final_state != ActionState::Success {
1505 SessionState::ReadyEnding
1506 } else {
1507 SessionState::Ready
1508 };
1509
1510 if let Some(cb) = &self.callback {
1511 if let Some(status) = self.action_status() {
1512 cb(&self.session_id, &status);
1513 }
1514 }
1515
1516 Ok(crate::subprocess::SubprocessResult {
1517 state: final_state,
1518 exit_code: r.exit_code,
1519 stdout: r.stdout,
1520 })
1521 }
1522
1523 fn apply_message(&mut self, msg: ActionMessage, identifier: &str) {
1525 match msg {
1526 ActionMessage::Progress(v) => {
1527 self.action.progress = Some(v);
1528 }
1529 ActionMessage::Status(s) => {
1530 self.action.status_message = Some(s);
1531 }
1532 ActionMessage::Fail(s) => {
1533 self.action.fail_message = Some(s);
1534 }
1535 ActionMessage::SetEnv { name, value } => {
1536 let key = normalize_env_key(&name);
1537 self.env_vars.insert(key.clone(), value.clone());
1538 if let Some(changes) = self.created_env_vars.get_mut(identifier) {
1539 changes.insert(key, Some(value));
1540 }
1541 }
1542 ActionMessage::UnsetEnv { name } => {
1543 let key = normalize_env_key(&name);
1544 self.env_vars.remove(&key);
1545 if let Some(changes) = self.created_env_vars.get_mut(identifier) {
1546 changes.insert(key, None);
1547 }
1548 }
1549 ActionMessage::RedactedEnv { name, value } => {
1550 if self.redactions_enabled() {
1551 let key = normalize_env_key(&name);
1552 self.env_vars.insert(key.clone(), value.clone());
1553 if let Some(changes) = self.created_env_vars.get_mut(identifier) {
1554 changes.insert(key, Some(value.clone()));
1555 }
1556 }
1557 self.redacted_values.insert(value);
1558 }
1559 ActionMessage::CancelMarkFailed { fail_message } => {
1560 self.action.fail_message = Some(fail_message);
1561 let _ = self.cancel_action(None, true);
1562 }
1563 }
1564 if let Some(cb) = &self.callback {
1565 if let Some(status) = self.action_status() {
1566 cb(&self.session_id, &status);
1567 }
1568 }
1569 }
1570
1571 fn materialize_path_mapping(&self, symtab: &mut SymbolTable) -> Result<(), SessionError> {
1573 let has_rules = !self.path_mapping_rules.is_empty();
1574 let rules_json = if has_rules {
1575 let rules: Vec<serde_json::Value> = self
1576 .path_mapping_rules
1577 .iter()
1578 .map(|rule| {
1579 serde_json::json!({
1580 "source_path_format": match rule.source_path_format {
1581 openjd_expr::path_mapping::PathFormat::Posix => "POSIX",
1582 openjd_expr::path_mapping::PathFormat::Windows => "WINDOWS",
1583 openjd_expr::path_mapping::PathFormat::Uri => "URI",
1584 },
1585 "source_path": &rule.source_path,
1586 "destination_path": &rule.destination_path,
1587 })
1588 })
1589 .collect();
1590 serde_json::json!({"version": "pathmapping-1.0", "path_mapping_rules": rules})
1591 .to_string()
1592 } else {
1593 serde_json::json!({}).to_string()
1594 };
1595
1596 symtab
1597 .set(
1598 "Session.HasPathMappingRules",
1599 openjd_expr::ExprValue::Bool(has_rules),
1600 )
1601 .map_err(|e| {
1602 SessionError::Runtime(format!("Failed to set HasPathMappingRules: {e}"))
1603 })?;
1604
1605 let filename = self.working_directory.join(format!(
1606 "pathmapping_{}.json",
1607 uuid::Uuid::new_v4().simple()
1608 ));
1609 std::fs::write(&filename, &rules_json).map_err(|e| SessionError::WorkingDirectory {
1610 path: filename.clone(),
1611 source: e,
1612 })?;
1613
1614 symtab
1615 .set(
1616 "Session.PathMappingRulesFile",
1617 openjd_expr::ExprValue::new_path(
1618 filename.to_string_lossy().to_string(),
1619 openjd_expr::path_mapping::PathFormat::host(),
1620 ),
1621 )
1622 .map_err(|e| {
1623 SessionError::Runtime(format!("Failed to set PathMappingRulesFile: {e}"))
1624 })?;
1625
1626 Ok(())
1627 }
1628
1629 pub fn evaluate_env_vars(
1632 &self,
1633 extra: Option<&HashMap<String, String>>,
1634 ) -> HashMap<String, Option<String>> {
1635 let mut result: HashMap<String, Option<String>> = self
1636 .process_env
1637 .iter()
1638 .map(|(k, v)| (normalize_env_key(k), Some(v.clone())))
1639 .collect();
1640 result.insert(
1643 "OPENJD_SESSION_WORKING_DIR".to_string(),
1644 Some(self.working_directory.to_string_lossy().into_owned()),
1645 );
1646 if let Some(extra) = extra {
1647 for (k, v) in extra {
1648 result.insert(normalize_env_key(k), Some(v.clone()));
1649 }
1650 }
1651 for id in &self.environments_entered {
1652 if let Some(changes) = self.created_env_vars.get(id) {
1653 for (name, value) in changes {
1654 result.insert(name.clone(), value.clone());
1655 }
1656 }
1657 }
1658 result
1659 }
1660
1661 pub fn job_parameter_values(&self) -> &JobParameterValues {
1663 &self.job_parameter_values
1664 }
1665
1666 pub fn build_symbol_table(
1674 &self,
1675 task_parameter_values: Option<&openjd_model::types::TaskParameterSet>,
1676 base: Option<&openjd_expr::SerializedSymbolTable>,
1677 ) -> Result<SymbolTable, SessionError> {
1678 use openjd_model::types::TaskParameterType;
1679
1680 let mut symtab = if let Some(base) = base {
1681 let mut s = base
1684 .to_symtab(openjd_expr::path_mapping::PathFormat::host())
1685 .map_err(|e| {
1686 SessionError::Runtime(format!("Failed to deserialize resolved_symtab: {e}"))
1687 })?;
1688 for (name, param) in &self.job_parameter_values {
1692 use openjd_model::types::JobParameterType;
1693 match param.param_type {
1694 JobParameterType::Path => {
1695 let raw = match ¶m.value {
1696 openjd_expr::ExprValue::String(s) => s.as_str(),
1697 openjd_expr::ExprValue::Path { value, .. } => value.as_str(),
1698 _ => continue,
1699 };
1700 let mapped = self.apply_path_mapping_to_string(raw);
1701 let key = format!("Param.{name}");
1702 s.set(
1703 &key,
1704 openjd_expr::ExprValue::new_path(
1705 mapped,
1706 openjd_expr::path_mapping::PathFormat::host(),
1707 ),
1708 )
1709 .map_err(|e| SessionError::Runtime(format!("Failed to set {key}: {e}")))?;
1710 }
1711 JobParameterType::ListPath => {
1712 if let openjd_expr::ExprValue::ListString(ref elements, _) = param.value {
1713 let mapped: Vec<openjd_expr::ExprValue> = elements
1714 .iter()
1715 .map(|s| {
1716 let m = self.apply_path_mapping_to_string(s);
1717 openjd_expr::ExprValue::new_path(
1718 m,
1719 openjd_expr::path_mapping::PathFormat::host(),
1720 )
1721 })
1722 .collect();
1723 let key = format!("Param.{name}");
1724 s.set(
1725 &key,
1726 openjd_expr::ExprValue::make_list(
1727 mapped,
1728 openjd_expr::ExprType::PATH,
1729 )
1730 .unwrap(),
1731 )
1732 .map_err(|e| {
1733 SessionError::Runtime(format!("Failed to set {key}: {e}"))
1734 })?;
1735 }
1736 }
1737 _ => {}
1738 }
1739 }
1740 s
1741 } else {
1742 let mut s = SymbolTable::new();
1743 for (name, param) in &self.job_parameter_values {
1744 let raw_key = format!("RawParam.{name}");
1745 s.set(&raw_key, param.value.clone())
1746 .map_err(|e| SessionError::Runtime(format!("Failed to set {raw_key}: {e}")))?;
1747 let key = format!("Param.{name}");
1748 use openjd_model::types::JobParameterType;
1749 let mapped_value = match param.param_type {
1750 JobParameterType::Path => {
1751 let raw = match ¶m.value {
1752 openjd_expr::ExprValue::String(s) => s.as_str(),
1753 openjd_expr::ExprValue::Path { value, .. } => value.as_str(),
1754 _ => "",
1755 };
1756 let mapped = self.apply_path_mapping_to_string(raw);
1757 openjd_expr::ExprValue::new_path(
1758 mapped,
1759 openjd_expr::path_mapping::PathFormat::host(),
1760 )
1761 }
1762 JobParameterType::ListPath => match ¶m.value {
1763 openjd_expr::ExprValue::ListString(elements, _) => {
1764 let mapped: Vec<openjd_expr::ExprValue> = elements
1765 .iter()
1766 .map(|s| {
1767 let m = self.apply_path_mapping_to_string(s);
1768 openjd_expr::ExprValue::new_path(
1769 m,
1770 openjd_expr::path_mapping::PathFormat::host(),
1771 )
1772 })
1773 .collect();
1774 openjd_expr::ExprValue::make_list(mapped, openjd_expr::ExprType::PATH)
1775 .unwrap()
1776 }
1777 other => self.apply_path_mapping_to_value(other),
1778 },
1779 _ => self.apply_path_mapping_to_value(¶m.value),
1780 };
1781 s.set(&key, mapped_value)
1782 .map_err(|e| SessionError::Runtime(format!("Failed to set {key}: {e}")))?;
1783 }
1784 s
1785 };
1786
1787 let host_path_format = openjd_expr::path_mapping::PathFormat::host();
1788 symtab
1789 .set(
1790 "Session.WorkingDirectory",
1791 openjd_expr::ExprValue::new_path(
1792 self.working_directory.to_string_lossy().to_string(),
1793 host_path_format,
1794 ),
1795 )
1796 .map_err(|e| SessionError::Runtime(format!("Failed to set WorkingDirectory: {e}")))?;
1797
1798 if let Some(task_params) = task_parameter_values {
1799 for (name, tv) in task_params {
1800 let raw_key = format!("Task.RawParam.{name}");
1802 symtab
1803 .set(&raw_key, tv.value.clone())
1804 .map_err(|e| SessionError::Runtime(format!("Failed to set {raw_key}: {e}")))?;
1805
1806 let key = format!("Task.Param.{name}");
1808 let param_value = match tv.param_type {
1809 TaskParameterType::Path => {
1810 let s = match &tv.value {
1811 openjd_expr::ExprValue::String(s) => s.as_str(),
1812 openjd_expr::ExprValue::Path { value, .. } => value.as_str(),
1813 _ => "",
1814 };
1815 let mapped = self.apply_path_mapping_to_string(s);
1816 openjd_expr::ExprValue::new_path(
1817 mapped,
1818 openjd_expr::path_mapping::PathFormat::host(),
1819 )
1820 }
1821 _ => tv.value.clone(),
1822 };
1823 symtab
1824 .set(&key, param_value)
1825 .map_err(|e| SessionError::Runtime(format!("Failed to set {key}: {e}")))?;
1826 }
1827 }
1828
1829 Ok(symtab)
1830 }
1831
1832 fn apply_path_mapping_to_string(&self, path: &str) -> String {
1834 for rule in self.path_mapping_rules.iter() {
1835 if let Some(mapped) = rule.apply(path) {
1836 return mapped;
1837 }
1838 }
1839 path.to_string()
1840 }
1841
1842 fn apply_path_mapping_to_value(
1844 &self,
1845 value: &openjd_expr::ExprValue,
1846 ) -> openjd_expr::ExprValue {
1847 match value {
1848 openjd_expr::ExprValue::Path {
1849 value: path_str,
1850 format,
1851 ..
1852 } => {
1853 for rule in self.path_mapping_rules.iter() {
1854 if let Some(mapped) = rule.apply(path_str) {
1855 return openjd_expr::ExprValue::new_path(mapped, *format);
1856 }
1857 }
1858 value.clone()
1859 }
1860 openjd_expr::ExprValue::ListPath(elements, fmt, _) => {
1861 let mapped: Vec<openjd_expr::ExprValue> = elements
1862 .iter()
1863 .map(|s| {
1864 let mapped_s =
1865 openjd_expr::path_mapping::apply_rules(&self.path_mapping_rules, s);
1866 openjd_expr::ExprValue::new_path(mapped_s, *fmt)
1867 })
1868 .collect();
1869 openjd_expr::ExprValue::make_list(mapped, openjd_expr::ExprType::PATH).unwrap()
1870 }
1871 _ => value.clone(),
1872 }
1873 }
1874}
1875
1876impl Drop for Session {
1877 fn drop(&mut self) {
1878 if !self.cleanup_called {
1879 log::warn!(
1884 target: "openjd.sessions",
1885 "Session '{}' was dropped without calling cleanup(). \
1886 Working directory may not have been cleaned up.",
1887 self.session_id
1888 );
1889 if !self.retain_working_dir {
1890 let _ = std::fs::remove_dir_all(&self.working_directory);
1891 }
1892 }
1893 }
1894}