Skip to main content

agent_exec/
create.rs

1//! Implementation of the `create` sub-command.
2//!
3//! `create` persists a full job definition without launching the supervisor or
4//! child process.  The job is left in `created` state so that `start` can
5//! launch it later.
6
7use anyhow::{Context, Result};
8use tracing::info;
9
10use crate::jobstore::{JobDir, generate_job_id, resolve_root};
11use crate::run::{
12    mask_env_vars, materialize_stdin_for_job, pre_create_log_files, resolve_effective_cwd,
13    validate_stdin_source,
14};
15use crate::schema::{CreateData, JobMeta, JobMetaJob, Response};
16use crate::tag::dedup_tags;
17
18/// Options for the `create` sub-command.
19///
20/// # Definition-time option alignment rule
21///
22/// Every definition-time option accepted here MUST also be accepted by `run` (and vice versa),
23/// since both commands write the same persisted job definition to `meta.json`. When adding a
24/// new persisted metadata field, wire it through both `create` and `run` unless the spec
25/// explicitly documents it as launch-only (e.g. snapshot timing, tail sizing, --wait).
26#[derive(Debug)]
27pub struct CreateOpts<'a> {
28    /// Command and arguments to execute when `start` is called.
29    pub command: Vec<String>,
30    /// Override for jobs root directory.
31    pub root: Option<&'a str>,
32    /// Timeout in milliseconds; 0 = no timeout.
33    pub timeout_ms: u64,
34    /// Milliseconds after SIGTERM before SIGKILL; 0 = immediate SIGKILL.
35    pub kill_after_ms: u64,
36    /// Working directory for the command.
37    pub cwd: Option<&'a str>,
38    /// Environment variables as KEY=VALUE strings (persisted as durable config).
39    pub env_vars: Vec<String>,
40    /// Paths to env files (persisted as file-path references, read at start time).
41    pub env_files: Vec<String>,
42    /// Whether to inherit the current process environment at start time (default: true).
43    pub inherit_env: bool,
44    /// Keys to mask in JSON output (values replaced with "***").
45    pub mask: Vec<String>,
46    /// Optional stdin source definition persisted and materialized for start.
47    pub stdin: Option<crate::run::StdinSource>,
48    /// Maximum bytes allowed for materialized stdin.bin.
49    pub stdin_max_bytes: u64,
50    /// Interval (ms) for state.json updated_at refresh; 0 = disabled.
51    pub progress_every_ms: u64,
52    /// Shell command string for command notification sink.
53    pub notify_command: Option<String>,
54    /// File path for NDJSON notification sink.
55    pub notify_file: Option<String>,
56    /// Resolved shell wrapper argv (e.g. ["sh", "-lc"]).
57    pub shell_wrapper: Vec<String>,
58    /// User-defined tags for this job (deduplicated preserving first-seen order).
59    pub tags: Vec<String>,
60    /// Pattern to match against output lines (output-match notification).
61    pub output_pattern: Option<String>,
62    /// Match type for output-match: "contains" or "regex".
63    pub output_match_type: Option<String>,
64    /// Stream selector: "stdout", "stderr", or "either".
65    pub output_stream: Option<String>,
66    /// Shell command string for output-match command sink.
67    pub output_command: Option<String>,
68    /// File path for output-match NDJSON file sink.
69    pub output_file: Option<String>,
70}
71
72/// Execute `create`: persist job definition and return JSON.
73pub fn execute(opts: CreateOpts) -> Result<()> {
74    if opts.command.is_empty() {
75        anyhow::bail!("no command specified for create");
76    }
77
78    let root = resolve_root(opts.root);
79    std::fs::create_dir_all(&root)
80        .with_context(|| format!("create jobs root {}", root.display()))?;
81
82    let job_id = generate_job_id(&root)?;
83    let created_at = crate::run::now_rfc3339_pub();
84
85    let env_keys: Vec<String> = opts
86        .env_vars
87        .iter()
88        .map(|kv| kv.split('=').next().unwrap_or(kv.as_str()).to_string())
89        .collect();
90
91    let masked_env_vars = mask_env_vars(&opts.env_vars, &opts.mask);
92
93    let effective_cwd = resolve_effective_cwd(opts.cwd);
94
95    // Build output-match config from definition-time options (same logic as `notify set`).
96    let on_output_match = crate::notify::build_output_match_config(
97        opts.output_pattern,
98        opts.output_match_type,
99        opts.output_stream,
100        opts.output_command,
101        opts.output_file,
102        None,
103    );
104
105    let notification =
106        if opts.notify_command.is_some() || opts.notify_file.is_some() || on_output_match.is_some()
107        {
108            Some(crate::schema::NotificationConfig {
109                notify_command: opts.notify_command.clone(),
110                notify_file: opts.notify_file.clone(),
111                on_output_match,
112            })
113        } else {
114            None
115        };
116
117    // Validate and deduplicate tags (preserving first-seen order).
118    let tags = dedup_tags(opts.tags)?;
119
120    let stdin_source = opts.stdin.clone();
121    validate_stdin_source(stdin_source.as_ref())?;
122
123    let meta = JobMeta {
124        job: JobMetaJob { id: job_id.clone() },
125        schema_version: crate::schema::SCHEMA_VERSION.to_string(),
126        command: opts.command.clone(),
127        created_at: created_at.clone(),
128        root: root.display().to_string(),
129        env_keys,
130        env_vars: masked_env_vars,
131        // Persist actual (unmasked) env vars for runtime use by `start`.
132        // --mask only affects display/metadata views; the real values are needed
133        // so `start` can apply them to the child process environment.
134        env_vars_runtime: opts.env_vars.clone(),
135        mask: opts.mask.clone(),
136        cwd: Some(effective_cwd),
137        notification,
138        tags,
139        // Execution-definition fields persisted for `start`.
140        inherit_env: opts.inherit_env,
141        env_files: opts.env_files.clone(),
142        timeout_ms: opts.timeout_ms,
143        kill_after_ms: opts.kill_after_ms,
144        progress_every_ms: opts.progress_every_ms,
145        shell_wrapper: Some(opts.shell_wrapper.clone()),
146        stdin_file: None,
147    };
148
149    let job_dir = JobDir::create(&root, &job_id, &meta)?;
150    let stdin_file =
151        materialize_stdin_for_job(&job_dir, stdin_source.as_ref(), opts.stdin_max_bytes)?;
152    if stdin_file.is_some() {
153        let mut meta_with_stdin = meta.clone();
154        meta_with_stdin.stdin_file = stdin_file;
155        job_dir.write_meta_atomic(&meta_with_stdin)?;
156    }
157    info!(job_id = %job_id, "created job directory (created state)");
158
159    // Pre-create empty log files.
160    pre_create_log_files(&job_dir)?;
161
162    // Write state.json with `created` status — no process spawned.
163    job_dir.init_state_created()?;
164
165    let stdout_log_path = job_dir.stdout_path().display().to_string();
166    let stderr_log_path = job_dir.stderr_path().display().to_string();
167
168    Response::new(
169        "create",
170        CreateData {
171            job_id,
172            state: "created".to_string(),
173            stdout_log_path,
174            stderr_log_path,
175        },
176    )
177    .print();
178
179    Ok(())
180}