1use anyhow::Result;
8use tracing::info;
9
10use crate::jobstore::{InvalidJobState, JobDir, resolve_root};
11use crate::run::{
12 SnapshotWaitOpts, SpawnSupervisorParams, mask_env_vars, run_snapshot_wait,
13 spawn_supervisor_process,
14};
15use crate::schema::{JobStatus, Response, RunData};
16
17#[derive(Debug)]
19pub struct StartOpts<'a> {
20 pub job_id: &'a str,
22 pub root: Option<&'a str>,
24 pub snapshot_after: u64,
26 pub tail_lines: u64,
28 pub max_bytes: u64,
30 pub wait: bool,
32 pub wait_poll_ms: u64,
34}
35
36pub fn execute(opts: StartOpts) -> Result<()> {
38 let root = resolve_root(opts.root);
39 let job_dir = JobDir::open(&root, opts.job_id)?;
40
41 let meta = job_dir.read_meta()?;
42 let state = job_dir.read_state()?;
43
44 if *state.status() != JobStatus::Created {
46 return Err(anyhow::Error::new(InvalidJobState(format!(
47 "job {} is in '{}' state; only 'created' jobs can be started",
48 opts.job_id,
49 state.status().as_str()
50 ))));
51 }
52
53 info!(job_id = %opts.job_id, "starting created job");
54
55 let full_log_path = job_dir.full_log_path().display().to_string();
57
58 let shell_wrapper = if let Some(ref w) = meta.shell_wrapper {
60 w.clone()
61 } else {
62 crate::config::default_shell_wrapper()
63 };
64
65 let (supervisor_pid, started_at) = spawn_supervisor_process(
73 &job_dir,
74 SpawnSupervisorParams {
75 job_id: opts.job_id.to_string(),
76 root: root.clone(),
77 full_log_path: full_log_path.clone(),
78 timeout_ms: meta.timeout_ms,
79 kill_after_ms: meta.kill_after_ms,
80 cwd: meta.cwd.clone(),
81 env_vars: meta.env_vars_runtime.clone(),
82 env_files: meta.env_files.clone(),
83 inherit_env: meta.inherit_env,
84 progress_every_ms: meta.progress_every_ms,
85 notify_command: meta
86 .notification
87 .as_ref()
88 .and_then(|n| n.notify_command.clone()),
89 notify_file: meta
90 .notification
91 .as_ref()
92 .and_then(|n| n.notify_file.clone()),
93 shell_wrapper,
94 command: meta.command.clone(),
95 },
96 )?;
97
98 info!(job_id = %opts.job_id, supervisor_pid, started_at = %started_at, "job started");
99
100 let stdout_log_path = job_dir.stdout_path().display().to_string();
101 let stderr_log_path = job_dir.stderr_path().display().to_string();
102
103 let elapsed_start = std::time::Instant::now();
104
105 let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
106 run_snapshot_wait(
107 &job_dir,
108 &SnapshotWaitOpts {
109 snapshot_after: opts.snapshot_after,
110 tail_lines: opts.tail_lines,
111 max_bytes: opts.max_bytes,
112 wait: opts.wait,
113 wait_poll_ms: opts.wait_poll_ms,
114 },
115 );
116
117 let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
118
119 let masked_env_vars = mask_env_vars(&meta.env_vars_runtime, &meta.mask);
121
122 Response::new(
123 "start",
124 RunData {
125 job_id: opts.job_id.to_string(),
126 state: final_state,
127 tags: meta.tags.clone(),
128 env_vars: masked_env_vars,
129 snapshot,
130 stdout_log_path,
131 stderr_log_path,
132 waited_ms,
133 elapsed_ms,
134 exit_code: exit_code_opt,
135 finished_at: finished_at_opt,
136 final_snapshot: final_snapshot_opt,
137 },
138 )
139 .print();
140
141 Ok(())
142}