1use anyhow::Result;
8use tracing::info;
9
10use crate::jobstore::{InvalidJobState, JobDir, resolve_root};
11use crate::run::{
12 SpawnSupervisorParams, mask_env_vars, observe_inline_output, spawn_supervisor_process,
13};
14use crate::schema::{JobStatus, Response, RunData};
15
16#[derive(Debug)]
18pub struct StartOpts<'a> {
19 pub job_id: &'a str,
21 pub root: Option<&'a str>,
23 pub no_auto_gc: bool,
25 pub auto_gc_older_than: Option<String>,
27 pub auto_gc_max_jobs: Option<u64>,
29 pub auto_gc_max_bytes: Option<u64>,
31 pub auto_gc_config: crate::gc::AutoGcConfig,
33 pub wait: bool,
35 pub until_seconds: u64,
37 pub forever: bool,
39 pub max_bytes: u64,
41}
42
43pub fn execute(opts: StartOpts) -> Result<()> {
45 let root = resolve_root(opts.root);
46 let job_dir = JobDir::open(&root, opts.job_id)?;
47
48 let meta = job_dir.read_meta()?;
49 let state = job_dir.read_state()?;
50
51 if *state.status() != JobStatus::Created {
53 return Err(anyhow::Error::new(InvalidJobState(format!(
54 "job {} is in '{}' state; only 'created' jobs can be started",
55 opts.job_id,
56 state.status().as_str()
57 ))));
58 }
59
60 info!(job_id = %opts.job_id, "starting created job");
61
62 let full_log_path = job_dir.full_log_path().display().to_string();
64
65 let shell_wrapper = if let Some(ref w) = meta.shell_wrapper {
67 w.clone()
68 } else {
69 crate::config::default_shell_wrapper()
70 };
71
72 let (supervisor_pid, started_at) = spawn_supervisor_process(
80 &job_dir,
81 SpawnSupervisorParams {
82 job_id: job_dir.job_id.clone(),
83 root: root.clone(),
84 full_log_path: full_log_path.clone(),
85 timeout_ms: meta.timeout_ms,
86 kill_after_ms: meta.kill_after_ms,
87 cwd: meta.cwd.clone(),
88 env_vars: meta.env_vars_runtime.clone(),
89 env_files: meta.env_files.clone(),
90 inherit_env: meta.inherit_env,
91 stdin_file: meta.stdin_file.clone(),
92 progress_every_ms: meta.progress_every_ms,
93 notify_command: meta
94 .notification
95 .as_ref()
96 .and_then(|n| n.notify_command.clone()),
97 notify_file: meta
98 .notification
99 .as_ref()
100 .and_then(|n| n.notify_file.clone()),
101 shell_wrapper,
102 command: meta.command.clone(),
103 },
104 )?;
105
106 info!(job_id = %opts.job_id, supervisor_pid, started_at = %started_at, "job started");
107
108 if !opts.no_auto_gc {
109 let mut auto_cfg = opts.auto_gc_config.clone();
110 if let Some(v) = opts.auto_gc_older_than {
111 auto_cfg.older_than = v;
112 }
113 if let Some(v) = opts.auto_gc_max_jobs {
114 auto_cfg.max_jobs = usize::try_from(v).ok();
115 }
116 if let Some(v) = opts.auto_gc_max_bytes {
117 auto_cfg.max_bytes = Some(v);
118 }
119 crate::gc::maybe_run_auto_gc(&root, &auto_cfg);
120 }
121
122 let stdout_log_path = job_dir.stdout_path().display().to_string();
123 let stderr_log_path = job_dir.stderr_path().display().to_string();
124
125 let masked_env_vars = mask_env_vars(&meta.env_vars_runtime, &meta.mask);
127 let observation = observe_inline_output(
128 &job_dir,
129 opts.wait,
130 opts.until_seconds,
131 opts.forever,
132 opts.max_bytes,
133 )?;
134
135 Response::new(
136 "start",
137 RunData {
138 job_id: job_dir.job_id.clone(),
139 state: observation.state,
140 tags: meta.tags.clone(),
141 env_vars: masked_env_vars,
142 stdout_log_path,
143 stderr_log_path,
144 elapsed_ms: 0,
145 waited_ms: observation.waited_ms,
146 stdout: observation.stdout,
147 stderr: observation.stderr,
148 stdout_range: observation.stdout_range,
149 stderr_range: observation.stderr_range,
150 stdout_total_bytes: observation.stdout_total_bytes,
151 stderr_total_bytes: observation.stderr_total_bytes,
152 encoding: observation.encoding,
153 exit_code: observation.exit_code,
154 finished_at: observation.finished_at,
155 signal: observation.signal,
156 duration_ms: observation.duration_ms,
157 },
158 )
159 .print();
160
161 Ok(())
162}