1use anyhow::Result;
8use tracing::info;
9
10use crate::jobstore::{InvalidJobState, JobDir, resolve_root};
11use crate::run::{
12 SpawnSupervisorParams, mask_env_vars, observe_inline_output, spawn_supervisor_process,
13};
14use crate::schema::{JobStatus, Response, RunData};
15
16#[derive(Debug)]
18pub struct StartOpts<'a> {
19 pub job_id: &'a str,
21 pub root: Option<&'a str>,
23 pub wait: bool,
25 pub until_seconds: u64,
27 pub forever: bool,
29 pub max_bytes: u64,
31}
32
33pub fn execute(opts: StartOpts) -> Result<()> {
35 let root = resolve_root(opts.root);
36 let job_dir = JobDir::open(&root, opts.job_id)?;
37
38 let meta = job_dir.read_meta()?;
39 let state = job_dir.read_state()?;
40
41 if *state.status() != JobStatus::Created {
43 return Err(anyhow::Error::new(InvalidJobState(format!(
44 "job {} is in '{}' state; only 'created' jobs can be started",
45 opts.job_id,
46 state.status().as_str()
47 ))));
48 }
49
50 info!(job_id = %opts.job_id, "starting created job");
51
52 let full_log_path = job_dir.full_log_path().display().to_string();
54
55 let shell_wrapper = if let Some(ref w) = meta.shell_wrapper {
57 w.clone()
58 } else {
59 crate::config::default_shell_wrapper()
60 };
61
62 let (supervisor_pid, started_at) = spawn_supervisor_process(
70 &job_dir,
71 SpawnSupervisorParams {
72 job_id: job_dir.job_id.clone(),
73 root: root.clone(),
74 full_log_path: full_log_path.clone(),
75 timeout_ms: meta.timeout_ms,
76 kill_after_ms: meta.kill_after_ms,
77 cwd: meta.cwd.clone(),
78 env_vars: meta.env_vars_runtime.clone(),
79 env_files: meta.env_files.clone(),
80 inherit_env: meta.inherit_env,
81 stdin_file: meta.stdin_file.clone(),
82 progress_every_ms: meta.progress_every_ms,
83 notify_command: meta
84 .notification
85 .as_ref()
86 .and_then(|n| n.notify_command.clone()),
87 notify_file: meta
88 .notification
89 .as_ref()
90 .and_then(|n| n.notify_file.clone()),
91 shell_wrapper,
92 command: meta.command.clone(),
93 },
94 )?;
95
96 info!(job_id = %opts.job_id, supervisor_pid, started_at = %started_at, "job started");
97
98 let stdout_log_path = job_dir.stdout_path().display().to_string();
99 let stderr_log_path = job_dir.stderr_path().display().to_string();
100
101 let masked_env_vars = mask_env_vars(&meta.env_vars_runtime, &meta.mask);
103 let observation = observe_inline_output(
104 &job_dir,
105 opts.wait,
106 opts.until_seconds,
107 opts.forever,
108 opts.max_bytes,
109 )?;
110
111 Response::new(
112 "start",
113 RunData {
114 job_id: job_dir.job_id.clone(),
115 state: observation.state,
116 tags: meta.tags.clone(),
117 env_vars: masked_env_vars,
118 stdout_log_path,
119 stderr_log_path,
120 elapsed_ms: 0,
121 waited_ms: observation.waited_ms,
122 stdout: observation.stdout,
123 stderr: observation.stderr,
124 stdout_range: observation.stdout_range,
125 stderr_range: observation.stderr_range,
126 stdout_total_bytes: observation.stdout_total_bytes,
127 stderr_total_bytes: observation.stderr_total_bytes,
128 encoding: observation.encoding,
129 exit_code: observation.exit_code,
130 finished_at: observation.finished_at,
131 signal: observation.signal,
132 duration_ms: observation.duration_ms,
133 },
134 )
135 .print();
136
137 Ok(())
138}