hecate_entity/job/
workflow.rs1use std::path::PathBuf;
2
3use hecate_executor::ExecutorError;
4use log::{debug, error, info};
5use sea_orm::{ActiveModelTrait, ActiveValue::Set, DbConn, EntityTrait, IntoActiveModel};
6use thiserror::Error;
7
8use crate::{
9 Job, JobError,
10 job::{self, JobStatus, scheduler::SchedulerJobConfig},
11};
12
13#[derive(Debug, Error)]
14pub enum RunJobError {
15 #[error("failed to compile job: {0}")]
16 CompilationFailed(ExecutorError),
17 #[error("failed to execute job: {0}")]
18 ExecutorError(#[from] ExecutorError),
19 #[error("unsupported project type : {0}")]
20 UnsupportedProjectType(String),
21 #[error("failed to get remote id")]
22 MissingRemoteId,
23 #[error("failed to update job in database")]
24 DatabaseError(#[from] sea_orm::DbErr),
25}
26
27impl super::Model {
28 pub async fn cancel_job(&self) {}
29
30 pub async fn update_status(self, db: &DbConn) -> Result<super::Model, JobError> {
31 let status = self.status;
32
33 let new_status = match &self.scheduler {
34 Some(scheduler) => {
35 if self.remote_job_id.is_none() {
36 return Err(JobError::InvalidJobConfig(
37 "scheduler job id missing in database for scheduler job".into(),
38 ));
39 }
40 let remote_job_id = self.remote_job_id.as_ref().unwrap();
41
42 let executor = self.executor().await?;
43 let status_cmd = scheduler.job_status_cmd(remote_job_id);
44 let response = executor.execute(&status_cmd).await?;
45 scheduler
46 .parse_job_status(&response)
47 .ok_or_else(|| JobError::ParsingStatusFailed(response))?
48 }
49 None => status,
50 };
51
52 let job = if new_status != status {
53 let mut job = self.into_active_model();
54 job.status = Set(new_status);
55 let job = job.update(db).await?;
56 job
57 } else {
58 self
59 };
60
61 Ok(job)
62 }
63
64 pub async fn update_job_status(job_id: i64, db: &DbConn) -> Result<Self, JobError> {
65 let job = Job::find_by_id(job_id)
66 .one(db)
67 .await?
68 .ok_or_else(|| JobError::JobNotFound(job_id))?;
69 job.update_status(db).await
70 }
71
72 pub async fn run(self, db: &DbConn) -> Result<Self, JobError> {
73 let job = self;
74 if job.cmakelists.is_none() {
75 error!("missing cmakelists, aborting");
76 Err(RunJobError::UnsupportedProjectType(
77 "Not a cmake project".to_string(),
78 ))?;
79 }
80 let cmakelists = job.cmakelists.as_deref().unwrap();
81
82 let name = whoami::username();
85 let job_name = &job.name;
86 let job_id = job.id;
87
88 let job_dir = String::from(format!("hecate/{name}/{job_id}#{job_name}"));
89 let sources_dir = format!("~/{job_dir}");
90 let results_dir = format!("{sources_dir}/results");
91 let executor = job.executor().await?;
92
93 info!("creating sources and results dir: {results_dir:?}");
94 executor.mkdirs(&results_dir).await?;
95
96 info!("writing source code");
97 executor
98 .write_file(
99 &format!(
100 "{sources_dir}/{}",
101 job.code_filename.as_deref().unwrap_or_else(|| "main.cpp")
102 ),
103 &job.code,
104 )
105 .await?;
106
107 info!("writing cmakelists");
108 executor
109 .write_file(&format!("{sources_dir}/CMakeLists.txt"), &cmakelists)
110 .await?;
111
112 let compiler = job.compiler.as_deref().unwrap_or_else(|| "g++");
113 let build_dir = PathBuf::from("/tmp").join(&job_dir).join("build");
114 let cmake_cmd = format!(
115 "cmake -DCMAKE_CXX_COMPILER={compiler} -B {} {}",
116 build_dir.display(),
117 sources_dir
118 );
119 debug!("cmake command: {cmake_cmd}");
120
121 let build_dir = build_dir.display();
122 let mut final_command = format!(
123 "source ~/.bashrc && {cmake_cmd} && cmake --build {build_dir} && OMP_PROC_BIND=spread OMP_PLACES=threads {build_dir}/run_sim"
124 );
125
126 if let Some(scheduler) = &job.scheduler {
127 final_command = scheduler.create_job_cmd(SchedulerJobConfig {
128 dir: Some(&results_dir),
129 name: Some(&format!("{job_id}#{}", &job.name)),
130 queue: job.queue.as_deref(),
131 cluster: job.cluster.as_deref(),
132 num_nodes: job.num_nodes,
133 walltime: job.walltime.as_deref(),
134 command: &final_command,
135 });
136 }
137
138 let response = executor.execute(&final_command).await?;
139 let mut remote_id_opt: Option<String> = None;
140
141 if let Some(scheduler) = &job.scheduler {
142 let remote_id = scheduler
143 .parse_job_id(&response)
144 .ok_or_else(|| RunJobError::MissingRemoteId)?;
145 info!("remote job id: {remote_id}");
146 remote_id_opt = Some(remote_id);
147 }
148
149 let mut job = job.into_active_model();
150
151 job.status = Set(JobStatus::Queued);
152 job.remote_job_id = Set(remote_id_opt);
153 let job = job.update(db).await?;
154
155 Ok(job)
156 }
157}
158pub async fn update_job_status(job_id: i64, db: &DbConn) -> Result<job::Model, JobError> {
159 let job = Job::find_by_id(job_id)
160 .one(db)
161 .await?
162 .ok_or_else(|| JobError::JobNotFound(job_id))?;
163 job.update_status(db).await
164}