hecate_entity/job/
workflow.rs

1use std::path::PathBuf;
2
3use hecate_executor::ExecutorError;
4use log::{debug, error, info};
5use sea_orm::{ActiveModelTrait, ActiveValue::Set, DbConn, EntityTrait, IntoActiveModel};
6use thiserror::Error;
7
8use crate::{
9    Job, JobError,
10    job::{self, JobStatus, scheduler::SchedulerJobConfig},
11};
12
13#[derive(Debug, Error)]
14pub enum RunJobError {
15    #[error("failed to compile job: {0}")]
16    CompilationFailed(ExecutorError),
17    #[error("failed to execute job: {0}")]
18    ExecutorError(#[from] ExecutorError),
19    #[error("unsupported project type : {0}")]
20    UnsupportedProjectType(String),
21    #[error("failed to get remote id")]
22    MissingRemoteId,
23    #[error("failed to update job in database")]
24    DatabaseError(#[from] sea_orm::DbErr),
25}
26
27impl super::Model {
28    pub async fn cancel_job(&self) {}
29
30    pub async fn update_status(self, db: &DbConn) -> Result<super::Model, JobError> {
31        let status = self.status;
32
33        let new_status = match &self.scheduler {
34            Some(scheduler) => {
35                if self.remote_job_id.is_none() {
36                    return Err(JobError::InvalidJobConfig(
37                        "scheduler job id missing in database for scheduler job".into(),
38                    ));
39                }
40                let remote_job_id = self.remote_job_id.as_ref().unwrap();
41
42                let executor = self.executor().await?;
43                let status_cmd = scheduler.job_status_cmd(remote_job_id);
44                let response = executor.execute(&status_cmd).await?;
45                scheduler
46                    .parse_job_status(&response)
47                    .ok_or_else(|| JobError::ParsingStatusFailed(response))?
48            }
49            None => status,
50        };
51
52        let job = if new_status != status {
53            let mut job = self.into_active_model();
54            job.status = Set(new_status);
55            let job = job.update(db).await?;
56            job
57        } else {
58            self
59        };
60
61        Ok(job)
62    }
63
64    pub async fn update_job_status(job_id: i64, db: &DbConn) -> Result<Self, JobError> {
65        let job = Job::find_by_id(job_id)
66            .one(db)
67            .await?
68            .ok_or_else(|| JobError::JobNotFound(job_id))?;
69        job.update_status(db).await
70    }
71
72    pub async fn run(self, db: &DbConn) -> Result<Self, JobError> {
73        let job = self;
74        if job.cmakelists.is_none() {
75            error!("missing cmakelists, aborting");
76            Err(RunJobError::UnsupportedProjectType(
77                "Not a cmake project".to_string(),
78            ))?;
79        }
80        let cmakelists = job.cmakelists.as_deref().unwrap();
81
82        // executor.execute("touch duckies").await?;
83        // info!("duckies have appeared");
84        let name = whoami::username();
85        let job_name = &job.name;
86        let job_id = job.id;
87
88        let job_dir = String::from(format!("hecate/{name}/{job_id}#{job_name}"));
89        let sources_dir = format!("~/{job_dir}");
90        let results_dir = format!("{sources_dir}/results");
91        let executor = job.executor().await?;
92
93        info!("creating sources and results dir: {results_dir:?}");
94        executor.mkdirs(&results_dir).await?;
95
96        info!("writing source code");
97        executor
98            .write_file(
99                &format!(
100                    "{sources_dir}/{}",
101                    job.code_filename.as_deref().unwrap_or_else(|| "main.cpp")
102                ),
103                &job.code,
104            )
105            .await?;
106
107        info!("writing cmakelists");
108        executor
109            .write_file(&format!("{sources_dir}/CMakeLists.txt"), &cmakelists)
110            .await?;
111
112        let compiler = job.compiler.as_deref().unwrap_or_else(|| "g++");
113        let build_dir = PathBuf::from("/tmp").join(&job_dir).join("build");
114        let cmake_cmd = format!(
115            "cmake -DCMAKE_CXX_COMPILER={compiler} -B {} {}",
116            build_dir.display(),
117            sources_dir
118        );
119        debug!("cmake command: {cmake_cmd}");
120
121        let build_dir = build_dir.display();
122        let mut final_command = format!(
123            "source ~/.bashrc && {cmake_cmd} && cmake --build {build_dir} && OMP_PROC_BIND=spread OMP_PLACES=threads {build_dir}/run_sim"
124        );
125
126        if let Some(scheduler) = &job.scheduler {
127            final_command = scheduler.create_job_cmd(SchedulerJobConfig {
128                dir: Some(&results_dir),
129                name: Some(&format!("{job_id}#{}", &job.name)),
130                queue: job.queue.as_deref(),
131                cluster: job.cluster.as_deref(),
132                num_nodes: job.num_nodes,
133                walltime: job.walltime.as_deref(),
134                command: &final_command,
135            });
136        }
137
138        let response = executor.execute(&final_command).await?;
139        let mut remote_id_opt: Option<String> = None;
140
141        if let Some(scheduler) = &job.scheduler {
142            let remote_id = scheduler
143                .parse_job_id(&response)
144                .ok_or_else(|| RunJobError::MissingRemoteId)?;
145            info!("remote job id: {remote_id}");
146            remote_id_opt = Some(remote_id);
147        }
148
149        let mut job = job.into_active_model();
150
151        job.status = Set(JobStatus::Queued);
152        job.remote_job_id = Set(remote_id_opt);
153        let job = job.update(db).await?;
154
155        Ok(job)
156    }
157}
158pub async fn update_job_status(job_id: i64, db: &DbConn) -> Result<job::Model, JobError> {
159    let job = Job::find_by_id(job_id)
160        .one(db)
161        .await?
162        .ok_or_else(|| JobError::JobNotFound(job_id))?;
163    job.update_status(db).await
164}