Skip to main content

openjd_sessions/
session.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// Copyright by contributors to this project.
3// SPDX-License-Identifier: (Apache-2.0 OR MIT)
4
5//! Session management — core state machine.
6//!
7//! # Session identifiers
8//!
9//! A [`Session`] is identified by an opaque `session_id: String` supplied by
10//! the caller (see [`SessionConfig::session_id`]). This identifier is **a
11//! correlation key for log messages**, not a credential:
12//!
13//! - It is deliberately included in log output so operators can trace which
14//!   log lines belong to which session, as both a formatted message component
15//!   and a structured log field (`session_id = ...`).
16//! - It does not authenticate or authorize anything. The OpenJD sessions
17//!   runtime does not consume session IDs as bearer tokens or secrets.
18//! - It has no meaning outside the process that created the session; the
19//!   runtime does not persist it to any shared store.
20//! - Auto-generated values (when the runtime needs to fabricate one) are
21//!   `<caller-id>:<uuid>` — random, but not secret.
22//!
23//! Static analyzers that flag "session_id" under a "cleartext logging of
24//! sensitive information" rule are producing false positives for this crate.
25//! The rule's heuristic assumes web-application session cookies, which is not
26//! the concept modeled here.
27
28use std::collections::{HashMap, HashSet};
29use std::path::{Path, PathBuf};
30use std::sync::Arc;
31use std::time::Duration;
32
33use openjd_expr::function_library::FunctionLibrary;
34use openjd_expr::path_mapping::PathMappingRule;
35use openjd_model::job::{Environment, StepScript};
36use openjd_model::symbol_table::SymbolTable;
37use openjd_model::types::JobParameterValues;
38use tokio_util::sync::CancellationToken;
39
40use crate::action::{ActionMessage, ActionResult, ActionState};
41use crate::action_status::ActionStatus;
42use crate::cross_user_helper::run_via_helper;
43
44/// Default notify period (in seconds) for `NOTIFY_THEN_TERMINATE` cancel when
45/// no explicit time_limit is provided. Matches the OpenJD spec default.
46pub const DEFAULT_CANCEL_NOTIFY_PERIOD_SECS: u64 = 5;
47#[cfg(unix)]
48use crate::cross_user_helper::CrossUserHelper;
49#[cfg(windows)]
50use crate::cross_user_helper::CrossUserHelperWin;
51use crate::error::SessionError;
52use crate::logging::{log_section_banner, LogContent};
53use crate::runner::env_script::EnvironmentScriptRunner;
54use crate::runner::step_script::StepScriptRunner;
55use crate::session_log;
56use crate::session_user::SessionUser;
57
58/// Session lifecycle state.
59///
60/// ```
61/// use openjd_sessions::SessionState;
62///
63/// let state = SessionState::Ready;
64/// assert_eq!(format!("{state}"), "READY");
65/// ```
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum SessionState {
68    Ready,
69    Running,
70    Canceling,
71    ReadyEnding,
72    Ended,
73}
74
75impl std::fmt::Display for SessionState {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        match self {
78            SessionState::Ready => write!(f, "READY"),
79            SessionState::Running => write!(f, "RUNNING"),
80            SessionState::Canceling => write!(f, "CANCELING"),
81            SessionState::ReadyEnding => write!(f, "READY_ENDING"),
82            SessionState::Ended => write!(f, "ENDED"),
83        }
84    }
85}
86
87/// Identifier for an environment within a session.
88pub type EnvironmentIdentifier = String;
89
90/// Callback invoked when action status changes.
91pub type SessionCallbackType = Box<dyn Fn(&str, &ActionStatus) + Send + Sync>;
92
93/// Configuration for creating a new Session.
94pub struct SessionConfig {
95    pub session_id: String,
96    pub job_parameter_values: JobParameterValues,
97    pub path_mapping_rules: Option<Vec<PathMappingRule>>,
98    pub retain_working_dir: bool,
99    pub callback: Option<SessionCallbackType>,
100    pub os_env_vars: Option<HashMap<String, String>>,
101    pub session_root_directory: Option<PathBuf>,
102    pub user: Option<Arc<dyn SessionUser>>,
103    /// Revision + extensions profile that drives expression-function
104    /// availability and redaction behaviour. Sessions do not use
105    /// caller-policy limits, so a [`ModelProfile`](openjd_model::ModelProfile)
106    /// is the right shape here — not a full
107    /// [`ValidationContext`](openjd_model::ValidationContext).
108    pub profile: Option<openjd_model::ModelProfile>,
109    /// Optional external cancellation token. When cancelled, all running and
110    /// future actions will be cancelled via the spec's cancellation sequence.
111    pub cancel_token: Option<CancellationToken>,
112    /// Controls behavior when a parent directory of the session root is
113    /// world-writable without the sticky bit set (POSIX only).
114    /// Defaults to `Strict` (fail-closed). Has no effect on Windows.
115    pub sticky_bit_policy: crate::tempdir::StickyBitPolicy,
116    /// Whether to accumulate subprocess stdout into result strings.
117    /// Intended for debugging only — production callers should leave this
118    /// `false` and observe output through the real-time callback instead.
119    /// Default is `false` — output is still streamed through the callback in
120    /// real time, but `ActionResult.stdout` and similar fields stay empty.
121    pub debug_collect_stdout: bool,
122    /// Whether to echo `openjd_*` directive lines (e.g. `openjd_progress`,
123    /// `openjd_status`, `openjd_env`, `openjd_redacted_env`, …) from
124    /// subprocess stdout to the session log.
125    ///
126    /// Default is `true`, matching the Python `openjd-sessions` reference
127    /// implementation. When `false`, recognised directives are still parsed
128    /// and acted on (progress, status, env-var changes, redacted-value
129    /// registration, …) but the directive lines themselves are filtered
130    /// out of the log stream.
131    ///
132    /// **Redaction interaction**: regardless of this flag, values from
133    /// `openjd_redacted_env` directives are added to the session's redaction
134    /// set *before* the originating line would be passed through, so when
135    /// `echo_openjd_directives = true` the directive line that introduces a
136    /// secret is still redacted (`NAME=********`) before reaching the log.
137    /// Subsequent occurrences of the secret in any log line are also
138    /// redacted. Setting this flag to `false` does not improve security —
139    /// it just removes the directive lines from operator-facing output.
140    pub echo_openjd_directives: bool,
141}
142
143fn format_exit_code(code: Option<i32>) -> String {
144    match code {
145        Some(c) => format!("exit code: {c}"),
146        None => "exit code: N/A".to_string(),
147    }
148}
149
150/// Normalize an environment variable name for the current platform.
151/// On Windows, env vars are case-insensitive, so we uppercase all keys
152/// to avoid undefined behavior from mixed-case duplicates in the Win32 API.
153fn normalize_env_key(name: &str) -> String {
154    #[cfg(windows)]
155    {
156        name.to_uppercase()
157    }
158    #[cfg(not(windows))]
159    {
160        name.to_string()
161    }
162}
163
164/// Tracks env var changes made during an environment's lifecycle.
165/// Uses a HashMap for automatic deduplication — last write wins,
166/// matching Python's SimplifiedEnvironmentVariableChanges dict.
167/// `None` value means "unset this variable".
168type EnvVarChanges = HashMap<String, Option<String>>;
169
170/// Tracks the status of the currently running (or most recently completed) action.
171struct ActionStatusFields {
172    state: Option<ActionState>,
173    progress: Option<f64>,
174    status_message: Option<String>,
175    fail_message: Option<String>,
176    exit_code: Option<i32>,
177    started_at: Option<std::time::SystemTime>,
178    ended_at: Option<std::time::SystemTime>,
179}
180
181impl ActionStatusFields {
182    fn new() -> Self {
183        Self {
184            state: None,
185            progress: None,
186            status_message: None,
187            fail_message: None,
188            exit_code: None,
189            started_at: None,
190            ended_at: None,
191        }
192    }
193
194    /// Reset all fields for a new action.
195    fn reset(&mut self) {
196        self.state = Some(ActionState::Running);
197        self.started_at = Some(std::time::SystemTime::now());
198        self.ended_at = None;
199        self.progress = None;
200        self.status_message = None;
201        self.fail_message = None;
202        self.exit_code = None;
203    }
204}
205
206/// Cancellation state for the current action and external cancellation support.
207struct CancelFields {
208    /// Token for the current action (cancelled to abort the subprocess).
209    token: Option<CancellationToken>,
210    /// Channel to send cancel requests (with optional time limit) to the subprocess.
211    request_tx: Option<tokio::sync::watch::Sender<Option<Duration>>>,
212    /// When true, a Canceled action result is reported as Failed.
213    mark_failed: bool,
214    /// External cancellation token from the caller; action tokens are children of this.
215    parent_token: Option<CancellationToken>,
216}
217
218impl CancelFields {
219    fn new(parent_token: Option<CancellationToken>) -> Self {
220        Self {
221            token: None,
222            request_tx: None,
223            mark_failed: false,
224            parent_token,
225        }
226    }
227}
228
229/// Cross-user execution state.
230struct CrossUserFields {
231    user: Option<Arc<dyn SessionUser>>,
232    /// Process-user-only directory for helper binary and wrapper scripts.
233    helpers_dir: Option<PathBuf>,
234    #[cfg(unix)]
235    helper: Option<CrossUserHelper>,
236    #[cfg(windows)]
237    helper: Option<CrossUserHelperWin>,
238    cancel_writer: Option<std::fs::File>,
239    /// Shared auth token for the helper process. `Some` whenever `helper` is
240    /// `Some`; `None` for same-user sessions. Kept separate from the helper
241    /// struct because the cancel path needs to access it even when the
242    /// helper has been moved into a runner.
243    helper_auth_token: Option<String>,
244}
245
246/// Build the session's function library from the session's
247/// [`ModelProfile`](openjd_model::ModelProfile) (or the current default
248/// expr profile when none is set), with the current path-mapping `rules`
249/// registered as host context.
250///
251/// Called whenever either the profile or the rules change so that
252/// `apply_path_mapping` always reflects the session's current rules.
253fn derive_library(
254    profile: Option<&openjd_model::ModelProfile>,
255    rules: &Arc<Vec<PathMappingRule>>,
256) -> Arc<FunctionLibrary> {
257    let host = openjd_expr::HostContext::WithRules(rules.clone());
258    let expr_profile = match profile {
259        Some(p) => p.to_expr_profile(host),
260        None => openjd_expr::ExprProfile::current().with_host_context(host),
261    };
262    openjd_expr::FunctionLibrary::for_profile(&expr_profile)
263}
264
265pub struct Session {
266    session_id: String,
267    state: SessionState,
268    ending_only: bool,
269    working_directory: PathBuf,
270    files_directory: PathBuf,
271    retain_working_dir: bool,
272    cleanup_called: bool,
273    // TempDir ownership (only when created via with_config)
274    _working_dir: Option<crate::tempdir::TempDir>,
275    _files_dir: Option<crate::tempdir::TempDir>,
276    // Environment tracking
277    environments: HashMap<EnvironmentIdentifier, Environment>,
278    environments_entered: Vec<EnvironmentIdentifier>,
279    // Env var tracking
280    env_vars: HashMap<String, String>,
281    process_env: HashMap<String, String>,
282    created_env_vars: HashMap<EnvironmentIdentifier, EnvVarChanges>,
283    // Expression evaluation
284    //
285    // `library` is the cached derived library, built from the session's
286    // `library` is the cached derived library, built from the session's
287    // `profile` (an optional `ModelProfile`) plus the current
288    // `path_mapping_rules`. Whenever either input changes, the library
289    // must be rebuilt via `derive_library`.
290    library: Arc<FunctionLibrary>,
291    path_mapping_rules: Arc<Vec<PathMappingRule>>,
292    job_parameter_values: JobParameterValues,
293    // Grouped fields
294    action: ActionStatusFields,
295    cancel: CancelFields,
296    cross_user: CrossUserFields,
297    // Callback
298    callback: Option<SessionCallbackType>,
299    // Redaction
300    redacted_values: HashSet<String>,
301    profile: Option<openjd_model::ModelProfile>,
302    debug_collect_stdout: bool,
303    /// Whether to echo `openjd_*` directive lines to the log. See
304    /// [`SessionConfig::echo_openjd_directives`].
305    echo_openjd_directives: bool,
306}
307
308impl Session {
309    /// Test-only constructor that accepts a pre-existing working directory.
310    /// Production code should use `Session::with_config`.
311    #[cfg(any(test, feature = "test-utils"))]
312    pub fn new_for_test(working_directory: PathBuf) -> Self {
313        let files_directory = working_directory.join("embedded_files");
314        Self {
315            session_id: String::new(),
316            state: SessionState::Ready,
317            ending_only: false,
318            working_directory,
319            files_directory,
320            retain_working_dir: false,
321            cleanup_called: false,
322            _working_dir: None,
323            _files_dir: None,
324            environments: HashMap::new(),
325            environments_entered: Vec::new(),
326            env_vars: HashMap::new(),
327            process_env: HashMap::new(),
328            created_env_vars: HashMap::new(),
329            library: derive_library(None, &Arc::new(Vec::new())),
330            path_mapping_rules: Arc::new(Vec::new()),
331            job_parameter_values: HashMap::new(),
332            action: ActionStatusFields::new(),
333            cancel: CancelFields::new(None),
334            cross_user: CrossUserFields {
335                user: None,
336                helpers_dir: None,
337                helper: None,
338                cancel_writer: None,
339                helper_auth_token: None,
340            },
341            callback: None,
342            redacted_values: HashSet::new(),
343            profile: None,
344            debug_collect_stdout: true, // test constructor — tests need captured stdout
345            echo_openjd_directives: true, // matches default in production config
346        }
347    }
348
349    /// Test-only: inject a cancel_writer (e.g. the write end of a pipe) so a test
350    /// can observe what `cancel_action` writes.
351    #[cfg(any(test, feature = "test-utils"))]
352    pub fn set_cancel_writer_for_test(&mut self, writer: std::fs::File) {
353        self.cross_user.cancel_writer = Some(writer);
354    }
355
356    /// Test-only: inject a `helper_auth_token` so `cancel_action` writes a
357    /// tokenized cancel JSON object. Pairs with `set_cancel_writer_for_test`
358    /// to exercise the token-in-cancel-JSON path without a real helper.
359    #[cfg(any(test, feature = "test-utils"))]
360    pub fn set_helper_auth_token_for_test(&mut self, token: String) {
361        self.cross_user.helper_auth_token = Some(token);
362    }
363
364    /// Test-only: force the session state (typically to `Running`) so `cancel_action`
365    /// will proceed without needing a full action run.
366    #[cfg(any(test, feature = "test-utils"))]
367    pub fn set_state_for_test(&mut self, state: SessionState) {
368        self.state = state;
369    }
370
371    /// Full constructor from SessionConfig.
372    pub fn with_config(mut config: SessionConfig) -> Result<Self, SessionError> {
373        let root_dir = match &config.session_root_directory {
374            Some(d) => d.clone(),
375            None => crate::tempdir::openjd_temp_dir(None)?,
376        };
377
378        #[cfg(unix)]
379        {
380            use crate::tempdir::StickyBitPolicy;
381            match config.sticky_bit_policy {
382                StickyBitPolicy::Strict => {
383                    if let Some(path) = crate::tempdir::find_missing_sticky_bit(&root_dir) {
384                        return Err(SessionError::PathPermissions {
385                            path: path.display().to_string(),
386                            reason: format!(
387                                "Directory is world-writable without the sticky bit set. \
388                                 This allows other users to modify or delete session files. \
389                                 Set the sticky bit (chmod +t {}) or use \
390                                 StickyBitPolicy::Warn to override.",
391                                path.display()
392                            ),
393                        });
394                    }
395                }
396                StickyBitPolicy::Warn => {
397                    if let Some(path) = crate::tempdir::find_missing_sticky_bit(&root_dir) {
398                        log::warn!(
399                            target: "openjd.sessions",
400                            "Sticky bit is not set on {}. This may pose a risk when running \
401                             work on this host as users may modify or delete files in this \
402                             directory which do not belong to them.",
403                            path.display()
404                        );
405                    }
406                }
407                StickyBitPolicy::Disabled => {}
408            }
409        }
410
411        let working_dir = crate::tempdir::TempDir::new(
412            Some(&root_dir),
413            Some(&config.session_id),
414            config.user.as_deref(),
415        )?;
416        let files_dir = crate::tempdir::TempDir::new(
417            Some(working_dir.path()),
418            Some("embedded_files"),
419            config.user.as_deref(),
420        )?;
421
422        let working_directory = working_dir.path().to_path_buf();
423        let files_directory = files_dir.path().to_path_buf();
424
425        let mut path_mapping_rules = config.path_mapping_rules.unwrap_or_default();
426        path_mapping_rules.sort_by_key(|r| std::cmp::Reverse(r.source_path.len()));
427        let path_mapping_rules = Arc::new(path_mapping_rules);
428        let profile = config.profile.take();
429        let library = derive_library(profile.as_ref(), &path_mapping_rules);
430        let process_env = config.os_env_vars.unwrap_or_default();
431
432        // Create helpers directory and spawn cross-user helper if needed.
433        // The helpers directory is 0o750 (owner rwx, group r-x) so the job user
434        // can traverse and execute but cannot create or modify files.
435        let mut helpers_dir = None;
436
437        #[cfg(unix)]
438        let (helper, cancel_writer, helper_auth_token) = if let Some(ref user) = config.user {
439            if !user.is_process_user() {
440                let hdir = crate::helper_binary::create_helpers_dir(
441                    &working_directory,
442                    Some(user.as_ref()),
443                )?;
444                let helper_path = crate::helper_binary::write_helper(&hdir, user.as_ref())?;
445                let (h, cw) = CrossUserHelper::spawn(&helper_path, user.as_ref())?;
446                let token = h.auth_token().to_string();
447                helpers_dir = Some(hdir);
448                (Some(h), Some(cw), Some(token))
449            } else {
450                (None, None, None)
451            }
452        } else {
453            (None, None, None)
454        };
455
456        #[cfg(windows)]
457        let (helper, cancel_writer, helper_auth_token) = if let Some(ref user) = config.user {
458            if !user.is_process_user() {
459                let hdir = crate::helper_binary::create_helpers_dir(
460                    &working_directory,
461                    Some(user.as_ref()),
462                )?;
463                let helper_path = crate::helper_binary::write_helper(&hdir, user.as_ref())?;
464                let (h, cw) = CrossUserHelperWin::spawn(&helper_path, user.as_ref())?;
465                let token = h.auth_token().to_string();
466                helpers_dir = Some(hdir);
467                (Some(h), Some(cw), Some(token))
468            } else {
469                (None, None, None)
470            }
471        } else {
472            (None, None, None)
473        };
474
475        // Log host info (mirrors Python Session.__init__)
476        session_log!(
477            info,
478            &config.session_id,
479            LogContent::HOST_INFO,
480            "openjd-sessions Library Version: {}",
481            env!("CARGO_PKG_VERSION")
482        );
483        session_log!(
484            info,
485            &config.session_id,
486            LogContent::HOST_INFO,
487            "Platform: {}",
488            std::env::consts::OS
489        );
490        session_log!(
491            info,
492            &config.session_id,
493            LogContent::HOST_INFO,
494            "Architecture: {}",
495            std::env::consts::ARCH
496        );
497        // `session_id` is an opaque correlation identifier, not a secret —
498        // it is emitted both as a structured field and in the message text so
499        // log consumers can associate this line with the rest of the session.
500        // See the module-level docs for rationale.
501        log::info!(target: "openjd.sessions", session_id = config.session_id.as_str(); "Initializing Open Job Description Session: {}", &config.session_id);
502        session_log!(
503            info,
504            &config.session_id,
505            LogContent::FILE_PATH,
506            "Session Working Directory: {}",
507            working_directory.display()
508        );
509        session_log!(
510            info,
511            &config.session_id,
512            LogContent::FILE_PATH,
513            "Session's Embedded Files Directory: {}",
514            files_directory.display()
515        );
516
517        Ok(Self {
518            session_id: config.session_id,
519            state: SessionState::Ready,
520            ending_only: false,
521            working_directory,
522            files_directory,
523            retain_working_dir: config.retain_working_dir,
524            cleanup_called: false,
525            _working_dir: Some(working_dir),
526            _files_dir: Some(files_dir),
527            environments: HashMap::new(),
528            environments_entered: Vec::new(),
529            env_vars: HashMap::new(),
530            process_env,
531            created_env_vars: HashMap::new(),
532            library,
533            path_mapping_rules,
534            job_parameter_values: config.job_parameter_values,
535            action: ActionStatusFields::new(),
536            cancel: CancelFields::new(config.cancel_token),
537            cross_user: CrossUserFields {
538                user: config.user,
539                helpers_dir,
540                helper,
541                cancel_writer,
542                helper_auth_token,
543            },
544            callback: config.callback,
545            redacted_values: HashSet::new(),
546            profile,
547            debug_collect_stdout: config.debug_collect_stdout,
548            echo_openjd_directives: config.echo_openjd_directives,
549        })
550    }
551
552    pub fn with_path_mapping(mut self, mut rules: Vec<PathMappingRule>) -> Self {
553        rules.sort_by_key(|r| std::cmp::Reverse(r.source_path.len()));
554        self.path_mapping_rules = Arc::new(rules);
555        self.library = derive_library(self.profile.as_ref(), &self.path_mapping_rules);
556        self
557    }
558
559    /// Extend the session's path mapping rules with additional rules.
560    /// Rules are re-sorted by source path length (longest first) after extending.
561    pub fn extend_path_mapping_rules(&mut self, additional: Vec<PathMappingRule>) {
562        let mut rules = (*self.path_mapping_rules).clone();
563        rules.extend(additional);
564        rules.sort_by_key(|r| std::cmp::Reverse(r.source_path.len()));
565        self.path_mapping_rules = Arc::new(rules);
566        self.library = derive_library(self.profile.as_ref(), &self.path_mapping_rules);
567    }
568
569    /// Get the current path mapping rules.
570    pub fn path_mapping_rules(&self) -> &[PathMappingRule] {
571        &self.path_mapping_rules
572    }
573
574    /// Set the [`ModelProfile`](openjd_model::ModelProfile) that drives
575    /// which expression functions and signatures are available to this
576    /// session and which redaction rules apply. Rebuilds the session's
577    /// derived function library so subsequent expression evaluation
578    /// sees the new profile.
579    pub fn with_profile(mut self, profile: openjd_model::ModelProfile) -> Self {
580        self.profile = Some(profile);
581        self.library = derive_library(self.profile.as_ref(), &self.path_mapping_rules);
582        self
583    }
584
585    /// Check whether redacted env vars are enabled, mirroring Python's `_redactions_enabled()`.
586    /// True if spec revision > v2023_09 OR "REDACTED_ENV_VARS" extension is present.
587    fn redactions_enabled(&self) -> bool {
588        match &self.profile {
589            Some(p) => {
590                p.revision() > openjd_model::types::SpecificationRevision::V2023_09
591                    || p.has_extension(openjd_model::types::ModelExtension::RedactedEnvVars)
592            }
593            None => false,
594        }
595    }
596
597    fn lib(&self) -> Option<&FunctionLibrary> {
598        Some(&self.library)
599    }
600
601    /// Fire the callback with the current action status.
602    /// Called when transitioning to RUNNING so callers see the state change immediately.
603    fn notify_callback(&self) {
604        if let Some(cb) = &self.callback {
605            if let Some(status) = self.action_status() {
606                cb(&self.session_id, &status);
607            }
608        }
609    }
610
611    // --- Properties ---
612
613    /// Returns the list of enabled extensions for this session.
614    /// If no profile is set, returns an empty vec.
615    pub fn get_enabled_extensions(&self) -> Vec<String> {
616        match &self.profile {
617            Some(p) => {
618                let mut exts: Vec<String> = p
619                    .extensions()
620                    .iter()
621                    .map(|e| e.as_str().to_string())
622                    .collect();
623                exts.sort();
624                exts
625            }
626            None => Vec::new(),
627        }
628    }
629
630    pub fn session_id(&self) -> &str {
631        &self.session_id
632    }
633    pub fn state(&self) -> SessionState {
634        self.state
635    }
636    pub fn working_directory(&self) -> &Path {
637        &self.working_directory
638    }
639    pub fn files_directory(&self) -> &Path {
640        &self.files_directory
641    }
642    pub fn environments_entered(&self) -> &[EnvironmentIdentifier] {
643        &self.environments_entered
644    }
645
646    /// Clone the cancel_writer file handle, if one exists.
647    /// Used by Python bindings to send cancel commands when the session is
648    /// taken by a background thread.
649    pub fn clone_cancel_writer(&self) -> Option<std::fs::File> {
650        self.cross_user
651            .cancel_writer
652            .as_ref()
653            .and_then(|f| f.try_clone().ok())
654    }
655
656    /// Get the current action status, if any action has been run.
657    pub fn action_status(&self) -> Option<ActionStatus> {
658        self.action.state.map(|state| ActionStatus {
659            state,
660            progress: self.action.progress,
661            status_message: self.action.status_message.clone(),
662            fail_message: self.action.fail_message.clone(),
663            exit_code: self.action.exit_code,
664            started_at: self.action.started_at,
665            ended_at: self.action.ended_at,
666        })
667    }
668
669    /// Override the action state. Used by the pyo3 binding to convert
670    /// Failed → Canceled when cancel was requested externally.
671    pub fn override_action_state(&mut self, state: ActionState) {
672        self.action.state = Some(state);
673    }
674
675    /// Redact sensitive values from output text.
676    pub fn redact(&self, text: &str) -> String {
677        // Sort by length descending so longer matches are replaced first,
678        // preventing partial replacements when values overlap (e.g. "FOOBAR" and "BAR").
679        let mut vals: Vec<&str> = self.redacted_values.iter().map(|s| s.as_str()).collect();
680        vals.sort_by_key(|s| std::cmp::Reverse(s.len()));
681        let mut result = text.to_string();
682        for val in vals {
683            result = result.replace(val, "********");
684        }
685        result
686    }
687
688    /// Create a cancellation token for an action. If a parent token was
689    /// provided at session construction, the action token is a child of it
690    /// so that cancelling the parent cascades to all actions.
691    fn new_action_cancel_token(&self) -> CancellationToken {
692        match &self.cancel.parent_token {
693            Some(parent) => parent.child_token(),
694            None => CancellationToken::new(),
695        }
696    }
697
698    /// Cancel the currently running async action.
699    pub fn cancel_action(
700        &mut self,
701        time_limit: Option<Duration>,
702        mark_action_failed: bool,
703    ) -> Result<(), SessionError> {
704        if self.state != SessionState::Running {
705            return Err(SessionError::InvalidState {
706                expected: vec![SessionState::Running],
707                current: self.state,
708            });
709        }
710        self.state = SessionState::Canceling;
711        if mark_action_failed {
712            self.cancel.mark_failed = true;
713        }
714
715        // Send cancel to the helper process via the dup'd stdin fd.
716        //
717        // Same-user sessions have no helper, so cancel_writer is None — in
718        // those sessions there's nothing to write. When a cancel_writer is
719        // present but no helper token is stored (e.g. an in-test injection
720        // via `set_cancel_writer_for_test`), the test is responsible for
721        // asserting whatever framing it expects; we still write a valid
722        // tokenized command if we have a token.
723        if let Some(ref mut writer) = self.cross_user.cancel_writer {
724            use std::io::Write;
725            let is_terminate = matches!(time_limit, Some(d) if d.is_zero());
726            let token_field = match &self.cross_user.helper_auth_token {
727                Some(t) => format!(r#""token":"{t}","#),
728                None => String::new(),
729            };
730            let cmd = if is_terminate {
731                format!(r#"{{{token_field}"cancel":"TERMINATE"}}"#)
732            } else {
733                let notify_period = time_limit
734                    .unwrap_or(Duration::from_secs(DEFAULT_CANCEL_NOTIFY_PERIOD_SECS))
735                    .as_secs();
736                format!(
737                    r#"{{{token_field}"cancel":"NOTIFY_THEN_TERMINATE","notifyPeriodInSeconds":{notify_period}}}"#
738                )
739            };
740            let _ = writer.write_all(cmd.as_bytes());
741            let _ = writer.write_all(b"\n");
742            let _ = writer.flush();
743        }
744
745        if let Some(tx) = &self.cancel.request_tx {
746            let _ = tx.send(time_limit);
747        }
748        if let Some(token) = &self.cancel.token {
749            token.cancel();
750        }
751        Ok(())
752    }
753
754    /// Clean up the session. Deletes working directory if not retained.
755    pub fn cleanup(&mut self) {
756        if self.cleanup_called {
757            return;
758        }
759        self.cleanup_called = true;
760
761        if !self.retain_working_dir {
762            log_section_banner(&self.session_id, "Session Cleanup");
763
764            // Shut down the cross-user helper before deleting the working directory
765            if let Some(ref mut helper) = self.cross_user.helper {
766                helper.shutdown();
767            }
768
769            session_log!(
770                info,
771                &self.session_id,
772                LogContent::FILE_PATH,
773                "Deleting working directory: {}",
774                self.working_directory.display()
775            );
776
777            // Cross-user cleanup: delete files owned by session user first,
778            // since files created via sudo may not be deletable by the process owner.
779            #[cfg(unix)]
780            if let Some(ref user) = self.cross_user.user {
781                if !user.is_process_user() {
782                    if let Ok(entries) = std::fs::read_dir(&self.working_directory) {
783                        let files: Vec<String> = entries
784                            .filter_map(|e| e.ok())
785                            .map(|e| e.path().to_string_lossy().to_string())
786                            .collect();
787                        if !files.is_empty() {
788                            let mut args = vec![
789                                "-u".to_string(),
790                                user.user().to_string(),
791                                "-i".to_string(),
792                                "rm".to_string(),
793                                "-rf".to_string(),
794                                "--".to_string(),
795                            ];
796                            args.extend(files);
797                            let _ = std::process::Command::new("sudo")
798                                .args(&args)
799                                .stdin(std::process::Stdio::null())
800                                .stdout(std::process::Stdio::null())
801                                .stderr(std::process::Stdio::null())
802                                .status();
803                        }
804                    }
805                }
806            }
807
808            // If we own TempDirs, clean them up; otherwise fall back to remove_dir_all
809            if let Some(ref mut files_dir) = self._files_dir {
810                let _ = files_dir.cleanup();
811            }
812            if let Some(ref mut working_dir) = self._working_dir {
813                let _ = working_dir.cleanup();
814            } else {
815                let _ = std::fs::remove_dir_all(&self.working_directory);
816            }
817        }
818        self.state = SessionState::Ended;
819    }
820
821    /// Enter an environment asynchronously (non-blocking).
822    /// Returns the environment identifier on success.
823    pub async fn enter_environment(
824        &mut self,
825        env: &Environment,
826        resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
827        identifier: Option<&str>,
828        os_env_vars: Option<&HashMap<String, String>>,
829    ) -> Result<String, SessionError> {
830        let (id, _stdout) = self
831            .enter_environment_with_output(env, resolved_symtab, identifier, os_env_vars)
832            .await?;
833        Ok(id)
834    }
835
836    /// Enter an environment, returning both the identifier and the stdout from the onEnter script.
837    pub async fn enter_environment_with_output(
838        &mut self,
839        env: &Environment,
840        resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
841        identifier: Option<&str>,
842        os_env_vars: Option<&HashMap<String, String>>,
843    ) -> Result<(String, String), SessionError> {
844        if self.state != SessionState::Ready {
845            return Err(SessionError::InvalidState {
846                expected: vec![SessionState::Ready],
847                current: self.state,
848            });
849        }
850
851        let symtab = self.build_symbol_table(None, resolved_symtab)?;
852
853        let identifier = match identifier {
854            Some(id) => {
855                if self.environments.contains_key(id) {
856                    return Err(SessionError::DuplicateEnvironment { id: id.to_string() });
857                }
858                id.to_string()
859            }
860            None => format!("{}:{}", self.session_id, uuid::Uuid::new_v4().simple()),
861        };
862        self.environments.insert(identifier.clone(), env.clone());
863        self.environments_entered.push(identifier.clone());
864        self.created_env_vars
865            .insert(identifier.clone(), HashMap::new());
866
867        // Set static variables
868        if let Some(vars) = &env.variables {
869            for (key, fmt_str) in vars {
870                let value = fmt_str
871                    .resolve_string_with(
872                        &symtab,
873                        &openjd_expr::FormatStringOptions::new().with_library(self.lib()),
874                    )
875                    .map_err(|e| SessionError::FormatString {
876                        context: format!("env var '{key}'"),
877                        reason: e.to_string(),
878                    })?;
879                let norm_key = normalize_env_key(key);
880                self.env_vars.insert(norm_key.clone(), value.clone());
881                if let Some(changes) = self.created_env_vars.get_mut(&identifier) {
882                    changes.insert(norm_key, Some(value));
883                }
884            }
885        }
886
887        let output = if env
888            .script
889            .as_ref()
890            .and_then(|s| s.actions.on_enter.as_ref())
891            .is_some()
892        {
893            self.action.reset();
894            self.state = SessionState::Running;
895            self.notify_callback();
896
897            log_section_banner(
898                &self.session_id,
899                &format!("Entering Environment: {}", env.name),
900            );
901
902            let cancel_token = self.new_action_cancel_token();
903            let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
904            self.cancel.token = Some(cancel_token.clone());
905            self.cancel.request_tx = Some(cancel_tx);
906
907            let env_vars = self.evaluate_env_vars(os_env_vars);
908            let mut action_symtab = symtab.clone();
909            self.materialize_path_mapping(&mut action_symtab)?;
910            // Box large locals — see run_task for rationale.
911            let action_symtab = Box::new(action_symtab);
912            let env_vars = Box::new(env_vars);
913            #[allow(unused_mut)]
914            let mut runner = EnvironmentScriptRunner::new(
915                &self.session_id,
916                self.working_directory.clone(),
917                self.files_directory.clone(),
918                self.cross_user.user.clone(),
919            )
920            .with_redactions(self.redactions_enabled())
921            .with_debug_collect_stdout(self.debug_collect_stdout)
922            .with_echo_openjd_directives(self.echo_openjd_directives)
923            .with_initial_redacted_values(self.redacted_values.iter().cloned().collect())
924            .with_cancel_token(cancel_token)
925            .with_cancel_request_rx(cancel_rx);
926            if let Some(ref hdir) = self.cross_user.helpers_dir {
927                runner = runner.with_helpers_directory(hdir.clone());
928            }
929            let mut runner = match self.cross_user.helper.take() {
930                Some(h) => {
931                    let r = runner.with_helper(h);
932                    match self
933                        .cross_user
934                        .cancel_writer
935                        .as_ref()
936                        .and_then(|f| f.try_clone().ok())
937                    {
938                        Some(w) => r.with_cancel_writer(w),
939                        None => r,
940                    }
941                }
942                None => runner,
943            };
944
945            let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
946
947            let lib = self.library.clone();
948            // Box::pin keeps the inner subprocess/select! state machine off the
949            // outer future's stack. Without this, the combined future exceeds
950            // Windows' default 1 MB thread stack in release builds.
951            let runner_fut = Box::pin(runner.enter(env, &action_symtab, Some(&lib), &env_vars, tx));
952            let result = self.drive_action(runner_fut, &mut rx, &identifier).await;
953            self.cross_user.helper = runner.take_helper();
954            let result = result?;
955
956            if result.state != ActionState::Success {
957                return Err(SessionError::EnvironmentScriptFailed {
958                    name: env.name.clone(),
959                    action: "onEnter".into(),
960                    reason: format_exit_code(result.exit_code),
961                });
962            }
963            result.stdout.clone()
964        } else {
965            let now = std::time::SystemTime::now();
966            self.action.state = Some(ActionState::Success);
967            self.action.started_at = Some(now);
968            self.action.progress = None;
969            self.action.status_message = None;
970            self.action.fail_message = None;
971            self.action.exit_code = None;
972
973            log_section_banner(
974                &self.session_id,
975                &format!("Entering Environment: {}", env.name),
976            );
977
978            self.action.ended_at = Some(std::time::SystemTime::now());
979
980            if let Some(cb) = &self.callback {
981                if let Some(status) = self.action_status() {
982                    cb(&self.session_id, &status);
983                }
984            }
985
986            String::new()
987        };
988
989        if self.state == SessionState::Running {
990            self.state = SessionState::Ready;
991        }
992        Ok((identifier, output))
993    }
994
995    /// Exit an environment asynchronously, identified by its environment identifier.
996    ///
997    /// Environments must be exited in LIFO order (last entered, first exited).
998    /// The `keep_session_running` parameter controls whether the session transitions
999    /// to `ReadyEnding` after exit — if `false`, the session becomes ending-only.
1000    pub async fn exit_environment(
1001        &mut self,
1002        identifier: &EnvironmentIdentifier,
1003        resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
1004        keep_session_running: bool,
1005        os_env_vars: Option<&HashMap<String, String>>,
1006    ) -> Result<String, SessionError> {
1007        if self.state != SessionState::Ready && self.state != SessionState::ReadyEnding {
1008            return Err(SessionError::InvalidState {
1009                expected: vec![SessionState::Ready, SessionState::ReadyEnding],
1010                current: self.state,
1011            });
1012        }
1013
1014        // Validate identifier exists
1015        let env = self
1016            .environments
1017            .get(identifier)
1018            .ok_or_else(|| SessionError::UnknownEnvironment {
1019                identifier: identifier.to_string(),
1020            })?
1021            .clone();
1022
1023        // Validate LIFO order
1024        if self.environments_entered.last() != Some(identifier) {
1025            return Err(SessionError::LifoViolation {
1026                expected: self
1027                    .environments_entered
1028                    .last()
1029                    .cloned()
1030                    .unwrap_or_default(),
1031                got: identifier.clone(),
1032            });
1033        }
1034
1035        let symtab = self.build_symbol_table(None, resolved_symtab)?;
1036
1037        // Evaluate env vars BEFORE removing from tracking (matching Python)
1038        // Box to keep off the async state machine — see run_task for rationale.
1039        let env_vars = Box::new(self.evaluate_env_vars(os_env_vars));
1040
1041        // Unless overridden by the caller, once we've started exiting environments
1042        // we can only exit environments.
1043        if !keep_session_running {
1044            self.ending_only = true;
1045        }
1046
1047        // Remove environment from tracking BEFORE running the exit script.
1048        // This matches the Python session behavior — a failed exit is still an exit,
1049        // and subsequent exits must be able to proceed in LIFO order.
1050        self.environments.remove(identifier);
1051        self.environments_entered.pop();
1052
1053        let output = if env
1054            .script
1055            .as_ref()
1056            .and_then(|s| s.actions.on_exit.as_ref())
1057            .is_some()
1058        {
1059            self.action.reset();
1060            self.state = SessionState::Running;
1061            self.notify_callback();
1062
1063            log_section_banner(
1064                &self.session_id,
1065                &format!("Exiting Environment: {}", env.name),
1066            );
1067
1068            let cancel_token = self.new_action_cancel_token();
1069            let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
1070            self.cancel.token = Some(cancel_token.clone());
1071            self.cancel.request_tx = Some(cancel_tx);
1072
1073            let mut action_symtab = symtab.clone();
1074            self.materialize_path_mapping(&mut action_symtab)?;
1075            // Box large locals — see run_task for rationale.
1076            let action_symtab = Box::new(action_symtab);
1077            #[allow(unused_mut)]
1078            let mut runner = EnvironmentScriptRunner::new(
1079                &self.session_id,
1080                self.working_directory.clone(),
1081                self.files_directory.clone(),
1082                self.cross_user.user.clone(),
1083            )
1084            .with_redactions(self.redactions_enabled())
1085            .with_debug_collect_stdout(self.debug_collect_stdout)
1086            .with_echo_openjd_directives(self.echo_openjd_directives)
1087            .with_initial_redacted_values(self.redacted_values.iter().cloned().collect())
1088            .with_cancel_token(cancel_token)
1089            .with_cancel_request_rx(cancel_rx);
1090            if let Some(ref hdir) = self.cross_user.helpers_dir {
1091                runner = runner.with_helpers_directory(hdir.clone());
1092            }
1093            let mut runner = match self.cross_user.helper.take() {
1094                Some(h) => {
1095                    let r = runner.with_helper(h);
1096                    match self
1097                        .cross_user
1098                        .cancel_writer
1099                        .as_ref()
1100                        .and_then(|f| f.try_clone().ok())
1101                    {
1102                        Some(w) => r.with_cancel_writer(w),
1103                        None => r,
1104                    }
1105                }
1106                None => runner,
1107            };
1108
1109            let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1110
1111            let lib = self.library.clone();
1112            // See the note in the onEnter path about Box::pin and the Windows
1113            // 1 MB thread-stack limit on release builds.
1114            let runner_fut = Box::pin(runner.exit(&env, &action_symtab, Some(&lib), &env_vars, tx));
1115            let result = self.drive_action(runner_fut, &mut rx, identifier).await;
1116            self.cross_user.helper = runner.take_helper();
1117            let result = result?;
1118
1119            if result.state != ActionState::Success {
1120                self.state = SessionState::ReadyEnding;
1121                return Err(SessionError::EnvironmentScriptFailed {
1122                    name: env.name.clone(),
1123                    action: "onExit".into(),
1124                    reason: format_exit_code(result.exit_code),
1125                });
1126            }
1127            result.stdout.clone()
1128        } else {
1129            // No exit script — set state based on ending_only (drive_action not called)
1130            let now = std::time::SystemTime::now();
1131            self.action.state = Some(ActionState::Success);
1132            self.action.started_at = Some(now);
1133            self.action.progress = None;
1134            self.action.status_message = None;
1135            self.action.fail_message = None;
1136            self.action.exit_code = None;
1137            self.state = if self.ending_only {
1138                SessionState::ReadyEnding
1139            } else {
1140                SessionState::Ready
1141            };
1142
1143            log_section_banner(
1144                &self.session_id,
1145                &format!("Exiting Environment: {}", env.name),
1146            );
1147
1148            self.action.ended_at = Some(std::time::SystemTime::now());
1149
1150            if let Some(cb) = &self.callback {
1151                if let Some(status) = self.action_status() {
1152                    cb(&self.session_id, &status);
1153                }
1154            }
1155
1156            String::new()
1157        };
1158
1159        Ok(output)
1160    }
1161
1162    /// Run a step action asynchronously.
1163    pub async fn run_task(
1164        &mut self,
1165        script: &StepScript,
1166        task_parameter_values: Option<&openjd_model::types::TaskParameterSet>,
1167        resolved_symtab: Option<&openjd_expr::SerializedSymbolTable>,
1168        os_env_vars: Option<&HashMap<String, String>>,
1169    ) -> Result<ActionResult, SessionError> {
1170        if self.state != SessionState::Ready {
1171            return Err(SessionError::InvalidState {
1172                expected: vec![SessionState::Ready],
1173                current: self.state,
1174            });
1175        }
1176
1177        let symtab = self.build_symbol_table(task_parameter_values, resolved_symtab)?;
1178
1179        self.action.reset();
1180        self.state = SessionState::Running;
1181        self.notify_callback();
1182
1183        log_section_banner(&self.session_id, "Running Task");
1184
1185        let cancel_token = self.new_action_cancel_token();
1186        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
1187        self.cancel.token = Some(cancel_token.clone());
1188        self.cancel.request_tx = Some(cancel_tx);
1189
1190        let env_vars = self.evaluate_env_vars(os_env_vars);
1191        let mut action_symtab = symtab.clone();
1192        self.materialize_path_mapping(&mut action_symtab)?;
1193        // Box large locals so they live on the heap instead of inflating
1194        // this async fn's state machine. Without this, the combined future
1195        // (run_task → drive_action → select!) exceeds Windows' default
1196        // 1 MB thread stack in release builds.
1197        let action_symtab = Box::new(action_symtab);
1198        let env_vars = Box::new(env_vars);
1199        #[allow(unused_mut)]
1200        let mut runner = StepScriptRunner::new(
1201            &self.session_id,
1202            self.working_directory.clone(),
1203            self.files_directory.clone(),
1204            self.cross_user.user.clone(),
1205        )
1206        .with_redactions(self.redactions_enabled())
1207        .with_debug_collect_stdout(self.debug_collect_stdout)
1208        .with_echo_openjd_directives(self.echo_openjd_directives)
1209        .with_initial_redacted_values(self.redacted_values.iter().cloned().collect())
1210        .with_cancel_token(cancel_token)
1211        .with_cancel_request_rx(cancel_rx);
1212        if let Some(ref hdir) = self.cross_user.helpers_dir {
1213            runner = runner.with_helpers_directory(hdir.clone());
1214        }
1215        let mut runner = match self.cross_user.helper.take() {
1216            Some(h) => {
1217                let r = runner.with_helper(h);
1218                match self
1219                    .cross_user
1220                    .cancel_writer
1221                    .as_ref()
1222                    .and_then(|f| f.try_clone().ok())
1223                {
1224                    Some(w) => r.with_cancel_writer(w),
1225                    None => r,
1226                }
1227            }
1228            None => runner,
1229        };
1230
1231        let step_identifier = format!("{}:step:{}", self.session_id, uuid::Uuid::new_v4().simple());
1232        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1233
1234        let lib = self.library.clone();
1235        // See the note in the onEnter path about Box::pin and the Windows
1236        // 1 MB thread-stack limit on release builds.
1237        let runner_fut = Box::pin(runner.run(script, &action_symtab, Some(&lib), &env_vars, tx));
1238        let result = self
1239            .drive_action(runner_fut, &mut rx, &step_identifier)
1240            .await;
1241        self.cross_user.helper = runner.take_helper();
1242        let result = result?;
1243
1244        Ok(ActionResult {
1245            state: result.state,
1246            exit_code: result.exit_code,
1247            stdout: result.stdout,
1248        })
1249    }
1250
1251    /// Run an ad-hoc subprocess within the Session.
1252    ///
1253    /// Unlike `run_task`, this runs a raw command without format string
1254    /// evaluation, embedded file materialization, or path mapping. Used by the
1255    /// worker agent for install/sync operations.
1256    pub async fn run_subprocess(
1257        &mut self,
1258        command: &str,
1259        args: Option<&[String]>,
1260        timeout: Option<Duration>,
1261        os_env_vars: Option<&HashMap<String, String>>,
1262        use_session_env_vars: bool,
1263        log_banner_message: Option<&str>,
1264    ) -> Result<crate::subprocess::SubprocessResult, SessionError> {
1265        if self.state != SessionState::Ready {
1266            return Err(SessionError::InvalidState {
1267                expected: vec![SessionState::Ready],
1268                current: self.state,
1269            });
1270        }
1271        if command.is_empty() || command.trim().is_empty() {
1272            return Err(SessionError::Runtime(
1273                "command must be a non-empty string".into(),
1274            ));
1275        }
1276        if let Some(t) = timeout {
1277            if t.is_zero() {
1278                return Err(SessionError::Runtime("timeout must be positive".into()));
1279            }
1280        }
1281
1282        if let Some(msg) = log_banner_message {
1283            log_section_banner(&self.session_id, msg);
1284        }
1285
1286        self.action.reset();
1287        self.state = SessionState::Running;
1288        self.notify_callback();
1289
1290        let env_vars = if use_session_env_vars {
1291            self.evaluate_env_vars(os_env_vars)
1292        } else {
1293            let mut result: HashMap<String, Option<String>> = self
1294                .process_env
1295                .iter()
1296                .map(|(k, v)| (k.clone(), Some(v.clone())))
1297                .collect();
1298            if let Some(extra) = os_env_vars {
1299                for (k, v) in extra {
1300                    result.insert(k.clone(), Some(v.clone()));
1301                }
1302            }
1303            result
1304        };
1305
1306        let mut cmd_args = vec![command.to_string()];
1307        if let Some(a) = args {
1308            cmd_args.extend(a.iter().cloned());
1309        }
1310
1311        // Route through the persistent helper process when cross-user helper is active.
1312        if self.cross_user.helper.is_some() {
1313            return self
1314                .run_subprocess_via_helper(&cmd_args, env_vars, timeout)
1315                .await;
1316        }
1317
1318        let cancel_token = self.new_action_cancel_token();
1319        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
1320        self.cancel.token = Some(cancel_token.clone());
1321        self.cancel.request_tx = Some(cancel_tx);
1322
1323        let config = crate::subprocess::SubprocessConfig {
1324            args: cmd_args,
1325            env_vars,
1326            working_dir: Some(self.working_directory.clone()),
1327            timeout,
1328            user: self.cross_user.user.clone(),
1329            cancel_method: crate::runner::CancelMethod::Terminate,
1330            cancel_request_rx: Some(cancel_rx),
1331            debug_collect_stdout: self.debug_collect_stdout,
1332        };
1333        let mut filter = crate::action_filter::ActionFilter::new(
1334            &self.session_id,
1335            self.echo_openjd_directives,
1336            false,
1337        );
1338        let subprocess_identifier = format!(
1339            "{}:subprocess:{}",
1340            self.session_id,
1341            uuid::Uuid::new_v4().simple()
1342        );
1343        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1344        let sid = self.session_id.clone();
1345        // See the note in the onEnter path about Box::pin and the Windows
1346        // 1 MB thread-stack limit on release builds.
1347        let runner_fut = Box::pin(crate::subprocess::run_subprocess(
1348            config,
1349            &mut filter,
1350            &sid,
1351            tx,
1352            cancel_token,
1353        ));
1354        self.drive_action(runner_fut, &mut rx, &subprocess_identifier)
1355            .await
1356    }
1357
1358    /// Execute a subprocess via the persistent cross-user helper process.
1359    ///
1360    /// Instead of spawning `sudo` (POSIX) or `CreateProcessAsUserW` (Windows)
1361    /// per action, sends a run command over the helper's stdin and reads
1362    /// streamed responses from its stdout.
1363    async fn run_subprocess_via_helper(
1364        &mut self,
1365        args: &[String],
1366        env_vars: std::collections::HashMap<String, Option<String>>,
1367        _timeout: Option<Duration>,
1368    ) -> Result<crate::subprocess::SubprocessResult, SessionError> {
1369        let mut filter = crate::action_filter::ActionFilter::new(
1370            &self.session_id,
1371            self.echo_openjd_directives,
1372            false,
1373        );
1374        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1375        let subprocess_identifier = format!(
1376            "{}:subprocess:{}",
1377            self.session_id,
1378            uuid::Uuid::new_v4().simple()
1379        );
1380
1381        let config = crate::subprocess::SubprocessConfig {
1382            args: args.to_vec(),
1383            env_vars,
1384            working_dir: Some(self.working_directory.clone()),
1385            timeout: _timeout,
1386            user: self.cross_user.user.clone(),
1387            cancel_method: crate::runner::CancelMethod::Terminate,
1388            cancel_request_rx: None,
1389            debug_collect_stdout: self.debug_collect_stdout,
1390        };
1391
1392        let helper = self
1393            .cross_user
1394            .helper
1395            .as_mut()
1396            .expect("caller checked helper.is_some()");
1397        let result = run_via_helper(
1398            helper,
1399            &config,
1400            &mut filter,
1401            &self.session_id,
1402            tx,
1403            self.cross_user.cancel_writer.as_ref(),
1404        )
1405        .await;
1406
1407        // Process any remaining messages.
1408        while let Ok(msg) = rx.try_recv() {
1409            self.apply_message(msg, &subprocess_identifier);
1410        }
1411
1412        let r = result?;
1413        self.action.state = Some(r.state);
1414        self.action.ended_at = Some(std::time::SystemTime::now());
1415        self.action.exit_code = r.exit_code;
1416        self.cancel.token = None;
1417        self.cancel.request_tx = None;
1418        self.cancel.mark_failed = false;
1419        self.state = if r.state == ActionState::Success {
1420            SessionState::Ready
1421        } else {
1422            SessionState::ReadyEnding
1423        };
1424        self.notify_callback();
1425        Ok(r)
1426    }
1427
1428    // --- Internal helpers ---
1429
1430    /// Run an action future while concurrently processing messages from the channel
1431    /// in real-time. This ensures callbacks fire as stdout lines are parsed, not
1432    /// after the action completes.
1433    async fn drive_action<F>(
1434        &mut self,
1435        action_fut: F,
1436        rx: &mut tokio::sync::mpsc::UnboundedReceiver<ActionMessage>,
1437        identifier: &str,
1438    ) -> Result<crate::subprocess::SubprocessResult, SessionError>
1439    where
1440        F: std::future::Future<Output = Result<crate::subprocess::SubprocessResult, SessionError>>,
1441    {
1442        tokio::pin!(action_fut);
1443        let mut result = None;
1444
1445        loop {
1446            tokio::select! {
1447                biased;
1448                msg = rx.recv(), if result.is_none() => {
1449                    // Channel returns None when closed; runner will finish soon.
1450                    if let Some(msg) = msg { self.apply_message(msg, identifier) }
1451                }
1452                r = &mut action_fut, if result.is_none() => {
1453                    result = Some(r);
1454                }
1455                else => break,
1456            }
1457            if result.is_some() {
1458                // Drain any remaining messages after the runner completes
1459                while let Ok(msg) = rx.try_recv() {
1460                    self.apply_message(msg, identifier);
1461                }
1462                break;
1463            }
1464        }
1465
1466        let r = match result.expect("loop guarantees result is Some") {
1467            Ok(r) => r,
1468            Err(e) => {
1469                // The subprocess failed to start or the runner encountered an error.
1470                // Update session state so callers see Failed instead of stuck Running.
1471                self.action.state = Some(ActionState::Failed);
1472                self.action.ended_at = Some(std::time::SystemTime::now());
1473                self.action.exit_code = None;
1474                self.cancel.token = None;
1475                self.cancel.request_tx = None;
1476                self.cancel.mark_failed = false;
1477                self.state = SessionState::ReadyEnding;
1478
1479                if let Some(cb) = &self.callback {
1480                    if let Some(status) = self.action_status() {
1481                        cb(&self.session_id, &status);
1482                    }
1483                }
1484
1485                return Err(e);
1486            }
1487        };
1488
1489        // If the action was canceled but mark_action_failed is set,
1490        // report it as Failed instead of Canceled (matches Python behavior)
1491        let final_state = if self.cancel.mark_failed && r.state == ActionState::Canceled {
1492            ActionState::Failed
1493        } else {
1494            r.state
1495        };
1496
1497        self.action.state = Some(final_state);
1498        self.action.ended_at = Some(std::time::SystemTime::now());
1499        self.action.exit_code = r.exit_code;
1500        self.cancel.token = None;
1501        self.cancel.request_tx = None;
1502        self.cancel.mark_failed = false;
1503
1504        self.state = if self.ending_only || final_state != ActionState::Success {
1505            SessionState::ReadyEnding
1506        } else {
1507            SessionState::Ready
1508        };
1509
1510        if let Some(cb) = &self.callback {
1511            if let Some(status) = self.action_status() {
1512                cb(&self.session_id, &status);
1513            }
1514        }
1515
1516        Ok(crate::subprocess::SubprocessResult {
1517            state: final_state,
1518            exit_code: r.exit_code,
1519            stdout: r.stdout,
1520        })
1521    }
1522
1523    /// Apply a single ActionMessage to session state.
1524    fn apply_message(&mut self, msg: ActionMessage, identifier: &str) {
1525        match msg {
1526            ActionMessage::Progress(v) => {
1527                self.action.progress = Some(v);
1528            }
1529            ActionMessage::Status(s) => {
1530                self.action.status_message = Some(s);
1531            }
1532            ActionMessage::Fail(s) => {
1533                self.action.fail_message = Some(s);
1534            }
1535            ActionMessage::SetEnv { name, value } => {
1536                let key = normalize_env_key(&name);
1537                self.env_vars.insert(key.clone(), value.clone());
1538                if let Some(changes) = self.created_env_vars.get_mut(identifier) {
1539                    changes.insert(key, Some(value));
1540                }
1541            }
1542            ActionMessage::UnsetEnv { name } => {
1543                let key = normalize_env_key(&name);
1544                self.env_vars.remove(&key);
1545                if let Some(changes) = self.created_env_vars.get_mut(identifier) {
1546                    changes.insert(key, None);
1547                }
1548            }
1549            ActionMessage::RedactedEnv { name, value } => {
1550                if self.redactions_enabled() {
1551                    let key = normalize_env_key(&name);
1552                    self.env_vars.insert(key.clone(), value.clone());
1553                    if let Some(changes) = self.created_env_vars.get_mut(identifier) {
1554                        changes.insert(key, Some(value.clone()));
1555                    }
1556                }
1557                self.redacted_values.insert(value);
1558            }
1559            ActionMessage::CancelMarkFailed { fail_message } => {
1560                self.action.fail_message = Some(fail_message);
1561                let _ = self.cancel_action(None, true);
1562            }
1563        }
1564        if let Some(cb) = &self.callback {
1565            if let Some(status) = self.action_status() {
1566                cb(&self.session_id, &status);
1567            }
1568        }
1569    }
1570
1571    /// Materialize path mapping rules to a JSON file and set symbol table entries.
1572    fn materialize_path_mapping(&self, symtab: &mut SymbolTable) -> Result<(), SessionError> {
1573        let has_rules = !self.path_mapping_rules.is_empty();
1574        let rules_json = if has_rules {
1575            let rules: Vec<serde_json::Value> = self
1576                .path_mapping_rules
1577                .iter()
1578                .map(|rule| {
1579                    serde_json::json!({
1580                        "source_path_format": match rule.source_path_format {
1581                            openjd_expr::path_mapping::PathFormat::Posix => "POSIX",
1582                            openjd_expr::path_mapping::PathFormat::Windows => "WINDOWS",
1583                            openjd_expr::path_mapping::PathFormat::Uri => "URI",
1584                        },
1585                        "source_path": &rule.source_path,
1586                        "destination_path": &rule.destination_path,
1587                    })
1588                })
1589                .collect();
1590            serde_json::json!({"version": "pathmapping-1.0", "path_mapping_rules": rules})
1591                .to_string()
1592        } else {
1593            serde_json::json!({}).to_string()
1594        };
1595
1596        symtab
1597            .set(
1598                "Session.HasPathMappingRules",
1599                openjd_expr::ExprValue::Bool(has_rules),
1600            )
1601            .map_err(|e| {
1602                SessionError::Runtime(format!("Failed to set HasPathMappingRules: {e}"))
1603            })?;
1604
1605        let filename = self.working_directory.join(format!(
1606            "pathmapping_{}.json",
1607            uuid::Uuid::new_v4().simple()
1608        ));
1609        std::fs::write(&filename, &rules_json).map_err(|e| SessionError::WorkingDirectory {
1610            path: filename.clone(),
1611            source: e,
1612        })?;
1613
1614        symtab
1615            .set(
1616                "Session.PathMappingRulesFile",
1617                openjd_expr::ExprValue::new_path(
1618                    filename.to_string_lossy().to_string(),
1619                    openjd_expr::path_mapping::PathFormat::host(),
1620                ),
1621            )
1622            .map_err(|e| {
1623                SessionError::Runtime(format!("Failed to set PathMappingRulesFile: {e}"))
1624            })?;
1625
1626        Ok(())
1627    }
1628
1629    /// Evaluate the cumulative env vars from process_env + extra + per-environment changes.
1630    /// Mirrors Python `_evaluate_current_session_env_vars`.
1631    pub fn evaluate_env_vars(
1632        &self,
1633        extra: Option<&HashMap<String, String>>,
1634    ) -> HashMap<String, Option<String>> {
1635        let mut result: HashMap<String, Option<String>> = self
1636            .process_env
1637            .iter()
1638            .map(|(k, v)| (normalize_env_key(k), Some(v.clone())))
1639            .collect();
1640        // Per spec: expose the session working directory as an environment variable
1641        // so nested subprocesses can access it without template variable syntax.
1642        result.insert(
1643            "OPENJD_SESSION_WORKING_DIR".to_string(),
1644            Some(self.working_directory.to_string_lossy().into_owned()),
1645        );
1646        if let Some(extra) = extra {
1647            for (k, v) in extra {
1648                result.insert(normalize_env_key(k), Some(v.clone()));
1649            }
1650        }
1651        for id in &self.environments_entered {
1652            if let Some(changes) = self.created_env_vars.get(id) {
1653                for (name, value) in changes {
1654                    result.insert(name.clone(), value.clone());
1655                }
1656            }
1657        }
1658        result
1659    }
1660
1661    /// Get the job parameter values.
1662    pub fn job_parameter_values(&self) -> &JobParameterValues {
1663        &self.job_parameter_values
1664    }
1665
1666    /// Build a SymbolTable for running actions, mirroring Python's Session._symbol_table().
1667    /// Populates job parameters (Param.* and RawParam.*), task parameters (Task.Param.* and Task.RawParam.*),
1668    /// and Session.WorkingDirectory.
1669    ///
1670    /// If `base` is provided, clones it as the starting point (it already contains Param.*, RawParam.*,
1671    /// Job.Name, Step.Name, and let bindings) and only layers Session.WorkingDirectory and Task.* on top.
1672    /// If `base` is None, builds from scratch using self.job_parameter_values.
1673    pub fn build_symbol_table(
1674        &self,
1675        task_parameter_values: Option<&openjd_model::types::TaskParameterSet>,
1676        base: Option<&openjd_expr::SerializedSymbolTable>,
1677    ) -> Result<SymbolTable, SessionError> {
1678        use openjd_model::types::TaskParameterType;
1679
1680        let mut symtab = if let Some(base) = base {
1681            // Deserialize with host path format — this is the template→session boundary.
1682            // Path values stored as Posix in template scope get normalized to host format.
1683            let mut s = base
1684                .to_symtab(openjd_expr::path_mapping::PathFormat::host())
1685                .map_err(|e| {
1686                    SessionError::Runtime(format!("Failed to deserialize resolved_symtab: {e}"))
1687                })?;
1688            // Re-apply path mapping to Param.* PATH values from the base symtab.
1689            // The base (resolved_symtab from create_job) has unmapped paths; the session
1690            // knows the worker's path mapping rules.
1691            for (name, param) in &self.job_parameter_values {
1692                use openjd_model::types::JobParameterType;
1693                match param.param_type {
1694                    JobParameterType::Path => {
1695                        let raw = match &param.value {
1696                            openjd_expr::ExprValue::String(s) => s.as_str(),
1697                            openjd_expr::ExprValue::Path { value, .. } => value.as_str(),
1698                            _ => continue,
1699                        };
1700                        let mapped = self.apply_path_mapping_to_string(raw);
1701                        let key = format!("Param.{name}");
1702                        s.set(
1703                            &key,
1704                            openjd_expr::ExprValue::new_path(
1705                                mapped,
1706                                openjd_expr::path_mapping::PathFormat::host(),
1707                            ),
1708                        )
1709                        .map_err(|e| SessionError::Runtime(format!("Failed to set {key}: {e}")))?;
1710                    }
1711                    JobParameterType::ListPath => {
1712                        if let openjd_expr::ExprValue::ListString(ref elements, _) = param.value {
1713                            let mapped: Vec<openjd_expr::ExprValue> = elements
1714                                .iter()
1715                                .map(|s| {
1716                                    let m = self.apply_path_mapping_to_string(s);
1717                                    openjd_expr::ExprValue::new_path(
1718                                        m,
1719                                        openjd_expr::path_mapping::PathFormat::host(),
1720                                    )
1721                                })
1722                                .collect();
1723                            let key = format!("Param.{name}");
1724                            s.set(
1725                                &key,
1726                                openjd_expr::ExprValue::make_list(
1727                                    mapped,
1728                                    openjd_expr::ExprType::PATH,
1729                                )
1730                                .unwrap(),
1731                            )
1732                            .map_err(|e| {
1733                                SessionError::Runtime(format!("Failed to set {key}: {e}"))
1734                            })?;
1735                        }
1736                    }
1737                    _ => {}
1738                }
1739            }
1740            s
1741        } else {
1742            let mut s = SymbolTable::new();
1743            for (name, param) in &self.job_parameter_values {
1744                let raw_key = format!("RawParam.{name}");
1745                s.set(&raw_key, param.value.clone())
1746                    .map_err(|e| SessionError::Runtime(format!("Failed to set {raw_key}: {e}")))?;
1747                let key = format!("Param.{name}");
1748                use openjd_model::types::JobParameterType;
1749                let mapped_value = match param.param_type {
1750                    JobParameterType::Path => {
1751                        let raw = match &param.value {
1752                            openjd_expr::ExprValue::String(s) => s.as_str(),
1753                            openjd_expr::ExprValue::Path { value, .. } => value.as_str(),
1754                            _ => "",
1755                        };
1756                        let mapped = self.apply_path_mapping_to_string(raw);
1757                        openjd_expr::ExprValue::new_path(
1758                            mapped,
1759                            openjd_expr::path_mapping::PathFormat::host(),
1760                        )
1761                    }
1762                    JobParameterType::ListPath => match &param.value {
1763                        openjd_expr::ExprValue::ListString(elements, _) => {
1764                            let mapped: Vec<openjd_expr::ExprValue> = elements
1765                                .iter()
1766                                .map(|s| {
1767                                    let m = self.apply_path_mapping_to_string(s);
1768                                    openjd_expr::ExprValue::new_path(
1769                                        m,
1770                                        openjd_expr::path_mapping::PathFormat::host(),
1771                                    )
1772                                })
1773                                .collect();
1774                            openjd_expr::ExprValue::make_list(mapped, openjd_expr::ExprType::PATH)
1775                                .unwrap()
1776                        }
1777                        other => self.apply_path_mapping_to_value(other),
1778                    },
1779                    _ => self.apply_path_mapping_to_value(&param.value),
1780                };
1781                s.set(&key, mapped_value)
1782                    .map_err(|e| SessionError::Runtime(format!("Failed to set {key}: {e}")))?;
1783            }
1784            s
1785        };
1786
1787        let host_path_format = openjd_expr::path_mapping::PathFormat::host();
1788        symtab
1789            .set(
1790                "Session.WorkingDirectory",
1791                openjd_expr::ExprValue::new_path(
1792                    self.working_directory.to_string_lossy().to_string(),
1793                    host_path_format,
1794                ),
1795            )
1796            .map_err(|e| SessionError::Runtime(format!("Failed to set WorkingDirectory: {e}")))?;
1797
1798        if let Some(task_params) = task_parameter_values {
1799            for (name, tv) in task_params {
1800                // Task.RawParam.* — raw string value
1801                let raw_key = format!("Task.RawParam.{name}");
1802                symtab
1803                    .set(&raw_key, tv.value.clone())
1804                    .map_err(|e| SessionError::Runtime(format!("Failed to set {raw_key}: {e}")))?;
1805
1806                // Task.Param.* — typed value with path mapping applied for PATH types
1807                let key = format!("Task.Param.{name}");
1808                let param_value = match tv.param_type {
1809                    TaskParameterType::Path => {
1810                        let s = match &tv.value {
1811                            openjd_expr::ExprValue::String(s) => s.as_str(),
1812                            openjd_expr::ExprValue::Path { value, .. } => value.as_str(),
1813                            _ => "",
1814                        };
1815                        let mapped = self.apply_path_mapping_to_string(s);
1816                        openjd_expr::ExprValue::new_path(
1817                            mapped,
1818                            openjd_expr::path_mapping::PathFormat::host(),
1819                        )
1820                    }
1821                    _ => tv.value.clone(),
1822                };
1823                symtab
1824                    .set(&key, param_value)
1825                    .map_err(|e| SessionError::Runtime(format!("Failed to set {key}: {e}")))?;
1826            }
1827        }
1828
1829        Ok(symtab)
1830    }
1831
1832    /// Apply path mapping rules to a string, returning the mapped result.
1833    fn apply_path_mapping_to_string(&self, path: &str) -> String {
1834        for rule in self.path_mapping_rules.iter() {
1835            if let Some(mapped) = rule.apply(path) {
1836                return mapped;
1837            }
1838        }
1839        path.to_string()
1840    }
1841
1842    /// Apply path mapping rules to a value if it's a Path or ListPath type.
1843    fn apply_path_mapping_to_value(
1844        &self,
1845        value: &openjd_expr::ExprValue,
1846    ) -> openjd_expr::ExprValue {
1847        match value {
1848            openjd_expr::ExprValue::Path {
1849                value: path_str,
1850                format,
1851                ..
1852            } => {
1853                for rule in self.path_mapping_rules.iter() {
1854                    if let Some(mapped) = rule.apply(path_str) {
1855                        return openjd_expr::ExprValue::new_path(mapped, *format);
1856                    }
1857                }
1858                value.clone()
1859            }
1860            openjd_expr::ExprValue::ListPath(elements, fmt, _) => {
1861                let mapped: Vec<openjd_expr::ExprValue> = elements
1862                    .iter()
1863                    .map(|s| {
1864                        let mapped_s =
1865                            openjd_expr::path_mapping::apply_rules(&self.path_mapping_rules, s);
1866                        openjd_expr::ExprValue::new_path(mapped_s, *fmt)
1867                    })
1868                    .collect();
1869                openjd_expr::ExprValue::make_list(mapped, openjd_expr::ExprType::PATH).unwrap()
1870            }
1871            _ => value.clone(),
1872        }
1873    }
1874}
1875
1876impl Drop for Session {
1877    fn drop(&mut self) {
1878        if !self.cleanup_called {
1879            // `session_id` is an opaque correlation identifier, not a secret.
1880            // Including it in this warning is essential for diagnosing which
1881            // session leaked its working directory. See the module-level docs
1882            // for rationale.
1883            log::warn!(
1884                target: "openjd.sessions",
1885                "Session '{}' was dropped without calling cleanup(). \
1886                 Working directory may not have been cleaned up.",
1887                self.session_id
1888            );
1889            if !self.retain_working_dir {
1890                let _ = std::fs::remove_dir_all(&self.working_directory);
1891            }
1892        }
1893    }
1894}