use anyhow::Result;
use tracing::info;
use crate::jobstore::{InvalidJobState, JobDir, resolve_root};
use crate::run::{
SnapshotWaitOpts, SpawnSupervisorParams, mask_env_vars, run_snapshot_wait,
spawn_supervisor_process,
};
use crate::schema::{JobStatus, Response, RunData};
#[derive(Debug)]
pub struct StartOpts<'a> {
pub job_id: &'a str,
pub root: Option<&'a str>,
pub snapshot_after: u64,
pub tail_lines: u64,
pub max_bytes: u64,
pub wait: bool,
pub wait_poll_ms: u64,
}
pub fn execute(opts: StartOpts) -> Result<()> {
let root = resolve_root(opts.root);
let job_dir = JobDir::open(&root, opts.job_id)?;
let meta = job_dir.read_meta()?;
let state = job_dir.read_state()?;
if *state.status() != JobStatus::Created {
return Err(anyhow::Error::new(InvalidJobState(format!(
"job {} is in '{}' state; only 'created' jobs can be started",
opts.job_id,
state.status().as_str()
))));
}
info!(job_id = %opts.job_id, "starting created job");
let full_log_path = job_dir.full_log_path().display().to_string();
let shell_wrapper = if let Some(ref w) = meta.shell_wrapper {
w.clone()
} else {
crate::config::default_shell_wrapper()
};
let (supervisor_pid, started_at) = spawn_supervisor_process(
&job_dir,
SpawnSupervisorParams {
job_id: job_dir.job_id.clone(),
root: root.clone(),
full_log_path: full_log_path.clone(),
timeout_ms: meta.timeout_ms,
kill_after_ms: meta.kill_after_ms,
cwd: meta.cwd.clone(),
env_vars: meta.env_vars_runtime.clone(),
env_files: meta.env_files.clone(),
inherit_env: meta.inherit_env,
progress_every_ms: meta.progress_every_ms,
notify_command: meta
.notification
.as_ref()
.and_then(|n| n.notify_command.clone()),
notify_file: meta
.notification
.as_ref()
.and_then(|n| n.notify_file.clone()),
shell_wrapper,
command: meta.command.clone(),
},
)?;
info!(job_id = %opts.job_id, supervisor_pid, started_at = %started_at, "job started");
let stdout_log_path = job_dir.stdout_path().display().to_string();
let stderr_log_path = job_dir.stderr_path().display().to_string();
let elapsed_start = std::time::Instant::now();
let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
run_snapshot_wait(
&job_dir,
&SnapshotWaitOpts {
snapshot_after: opts.snapshot_after,
tail_lines: opts.tail_lines,
max_bytes: opts.max_bytes,
wait: opts.wait,
wait_poll_ms: opts.wait_poll_ms,
wait_until_ms: 0,
wait_forever: true,
},
);
let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
let masked_env_vars = mask_env_vars(&meta.env_vars_runtime, &meta.mask);
Response::new(
"start",
RunData {
job_id: job_dir.job_id.clone(),
state: final_state,
tags: meta.tags.clone(),
env_vars: masked_env_vars,
snapshot,
stdout_log_path,
stderr_log_path,
waited_ms,
elapsed_ms,
exit_code: exit_code_opt,
finished_at: finished_at_opt,
final_snapshot: final_snapshot_opt,
},
)
.print();
Ok(())
}