1use anyhow::Result;
8use tracing::info;
9
10use crate::jobstore::{InvalidJobState, JobDir, resolve_root};
11use crate::run::{
12 SpawnSupervisorParams, mask_env_vars, observe_inline_output, spawn_supervisor_process,
13};
14use crate::schema::{JobStatus, Response, RunData};
15
16#[derive(Debug)]
18pub struct StartOpts<'a> {
19 pub job_id: &'a str,
21 pub root: Option<&'a str>,
23 pub no_auto_gc: bool,
25 pub auto_gc_older_than: Option<String>,
27 pub auto_gc_max_jobs: Option<u64>,
29 pub auto_gc_max_bytes: Option<u64>,
31 pub auto_gc_config: crate::gc::AutoGcConfig,
33 pub wait: bool,
35 pub until_seconds: u64,
37 pub forever: bool,
39 pub max_bytes: u64,
41 pub compression_mode: crate::compress::CompressionMode,
42}
43
44pub fn execute(opts: StartOpts) -> Result<()> {
46 let root = resolve_root(opts.root);
47 let job_dir = JobDir::open(&root, opts.job_id)?;
48
49 let meta = job_dir.read_meta()?;
50 let state = job_dir.read_state()?;
51
52 if *state.status() != JobStatus::Created {
54 return Err(anyhow::Error::new(InvalidJobState(format!(
55 "job {} is in '{}' state; only 'created' jobs can be started",
56 opts.job_id,
57 state.status().as_str()
58 ))));
59 }
60
61 info!(job_id = %opts.job_id, "starting created job");
62
63 let full_log_path = job_dir.full_log_path().display().to_string();
65
66 let shell_wrapper = if let Some(ref w) = meta.shell_wrapper {
68 w.clone()
69 } else {
70 crate::config::default_shell_wrapper()
71 };
72
73 let (supervisor_pid, started_at) = spawn_supervisor_process(
81 &job_dir,
82 SpawnSupervisorParams {
83 job_id: job_dir.job_id.clone(),
84 root: root.clone(),
85 full_log_path: full_log_path.clone(),
86 timeout_ms: meta.timeout_ms,
87 kill_after_ms: meta.kill_after_ms,
88 cwd: meta.cwd.clone(),
89 env_vars: meta.env_vars_runtime.clone(),
90 env_files: meta.env_files.clone(),
91 inherit_env: meta.inherit_env,
92 stdin_file: meta.stdin_file.clone(),
93 progress_every_ms: meta.progress_every_ms,
94 notify_command: meta
95 .notification
96 .as_ref()
97 .and_then(|n| n.notify_command.clone()),
98 notify_file: meta
99 .notification
100 .as_ref()
101 .and_then(|n| n.notify_file.clone()),
102 shell_wrapper,
103 command: meta.command.clone(),
104 },
105 )?;
106
107 info!(job_id = %opts.job_id, supervisor_pid, started_at = %started_at, "job started");
108
109 if !opts.no_auto_gc {
110 let mut auto_cfg = opts.auto_gc_config.clone();
111 if let Some(v) = opts.auto_gc_older_than {
112 auto_cfg.older_than = v;
113 }
114 if let Some(v) = opts.auto_gc_max_jobs {
115 auto_cfg.max_jobs = usize::try_from(v).ok();
116 }
117 if let Some(v) = opts.auto_gc_max_bytes {
118 auto_cfg.max_bytes = Some(v);
119 }
120 crate::gc::maybe_run_auto_gc(&root, &auto_cfg);
121 }
122
123 let stdout_log_path = job_dir.stdout_path().display().to_string();
124 let stderr_log_path = job_dir.stderr_path().display().to_string();
125
126 let masked_env_vars = mask_env_vars(&meta.env_vars_runtime, &meta.mask);
128 let observation = observe_inline_output(
129 &job_dir,
130 opts.wait,
131 opts.until_seconds,
132 opts.forever,
133 opts.max_bytes,
134 )?;
135 let compression = crate::compress::compress(crate::compress::CompressionInput {
136 command: &meta.command,
137 stdout: &observation.stdout,
138 stderr: &observation.stderr,
139 stdout_original_bytes: observation.stdout_total_bytes,
140 stderr_original_bytes: observation.stderr_total_bytes,
141 mode: opts.compression_mode,
142 });
143
144 Response::new(
145 "start",
146 RunData {
147 job_id: job_dir.job_id.clone(),
148 state: observation.state,
149 tags: meta.tags.clone(),
150 env_vars: masked_env_vars,
151 stdout_log_path,
152 stderr_log_path,
153 elapsed_ms: 0,
154 waited_ms: observation.waited_ms,
155 stdout: observation.stdout,
156 stderr: observation.stderr,
157 stdout_range: observation.stdout_range,
158 stderr_range: observation.stderr_range,
159 stdout_total_bytes: observation.stdout_total_bytes,
160 stderr_total_bytes: observation.stderr_total_bytes,
161 encoding: observation.encoding,
162 exit_code: observation.exit_code,
163 finished_at: observation.finished_at,
164 signal: observation.signal,
165 duration_ms: observation.duration_ms,
166 compression,
167 },
168 )
169 .print();
170
171 Ok(())
172}