gosh_runner/
job.rs

1// [[file:../runners.note::9b1f2893][9b1f2893]]
2//! For handling running task/job
3use super::*;
4
5use serde::{Deserialize, Serialize};
6use tempfile::{tempdir, tempdir_in, TempDir};
7// 9b1f2893 ends here
8
9// [[file:../runners.note::*job][job:1]]
10/// Represents a computational job inputted by user.
11#[derive(Debug, Deserialize, Serialize)]
12pub struct Job {
13    /// Input string for stdin
14    input: String,
15    
16    /// The content of running script
17    script: String,
18
19    /// Path to a file for saving input stream of computation
20    inp_file: PathBuf,
21
22    /// Path to a file for saving output stream of computation.
23    out_file: PathBuf,
24
25    /// Path to a file for saving error stream of computation.
26    err_file: PathBuf,
27
28    /// Path to a script file that defining how to start computation
29    run_file: PathBuf,
30
31    /// Extra files required for computation
32    extra_files: Vec<PathBuf>,
33}
34
35impl Job {
36    /// Construct a Job running shell script.
37    ///
38    /// # Parameters
39    ///
40    /// * script: the content of the script for running the job.
41    ///
42    pub fn new(script: &str) -> Self {
43        Self {
44            script: script.into(),
45            input: String::new(),
46
47            out_file: "job.out".into(),
48            err_file: "job.err".into(),
49            run_file: "run".into(),
50            inp_file: "job.inp".into(),
51            extra_files: vec![],
52        }
53    }
54
55    /// Add a new file into extra-files list.
56    pub fn attach_file<P: AsRef<Path>>(&mut self, file: P) {
57        let file: PathBuf = file.as_ref().into();
58        if !self.extra_files.contains(&file) {
59            self.extra_files.push(file);
60        } else {
61            warn!("try to attach a dumplicated file: {}!", file.display());
62        }
63    }
64}
65// job:1 ends here
66
67// [[file:../runners.note::*base][base:1]]
68/// Computation represents a submitted `Job`
69pub struct Computation {
70    job: Job,
71
72    // command session. The drop order is above Tempdir
73    session: Option<crate::process::Session<tokio::process::Child>>,
74    
75    /// The working directory of computation
76    wrk_dir: TempDir,
77}
78// base:1 ends here
79
80// [[file:../runners.note::*paths][paths:1]]
81impl Computation {
82    /// The full path to the working directory for running the job.
83    pub fn wrk_dir(&self) -> &Path {
84        self.wrk_dir.path()
85    }
86
87    /// The full path to computation input file (stdin).
88    pub fn inp_file(&self) -> PathBuf {
89        self.wrk_dir().join(&self.job.inp_file)
90    }
91
92    /// The full path to computation output file (stdout).
93    pub fn out_file(&self) -> PathBuf {
94        self.wrk_dir().join(&self.job.out_file)
95    }
96
97    /// The full path to computation error file (stderr).
98    pub fn err_file(&self) -> PathBuf {
99        self.wrk_dir().join(&self.job.err_file)
100    }
101
102    /// The full path to the script for running the job.
103    pub fn run_file(&self) -> PathBuf {
104        self.wrk_dir().join(&self.job.run_file)
105    }
106}
107// paths:1 ends here
108
109// [[file:../runners.note::*core][core:1]]
110use tokio::io::AsyncWriteExt;
111
112impl Job {
113    /// Submit the job and turn it into Computation.
114    pub fn submit(self) -> Computation {
115        Computation::new(self)
116    }
117}
118
119impl Computation {
120    /// Construct `Computation` of user inputted `Job`.
121    pub fn new(job: Job) -> Self {
122        use std::fs::File;
123        use std::os::unix::fs::OpenOptionsExt;
124
125        // create working directory in scratch space.
126        let wdir = tempfile::TempDir::new_in(".").expect("temp dir");
127        let session = Computation {
128            job,
129            wrk_dir: wdir.into(),
130            session: None,
131        };
132
133        // create run file
134        let file = session.run_file();
135
136        // make run script executable
137        match std::fs::OpenOptions::new()
138            .create(true)
139            .write(true)
140            .mode(0o770)
141            .open(&file)
142        {
143            Ok(mut f) => {
144                let _ = f.write_all(session.job.script.as_bytes());
145                trace!("script content wrote to: {}.", file.display());
146            }
147            Err(e) => {
148                panic!("Error whiling creating job run file: {}", e);
149            }
150        }
151        let file = session.inp_file();
152        match File::create(&session.inp_file()) {
153            Ok(mut f) => {
154                let _ = f.write_all(session.job.input.as_bytes());
155                trace!("input content wrote to: {}.", file.display());
156            }
157            Err(e) => {
158                panic!("Error while creating job input file: {}", e);
159            }
160        }
161
162        session
163    }
164
165    /// Wait for background command to complete.
166    async fn wait(&mut self) -> Result<()> {
167        if let Some(s) = self.session.as_mut() {
168            let ecode = s.child.wait().await?;
169            info!("job session exited: {}", ecode);
170        } else {
171            error!("Job not started yet.");
172        }
173        Ok(())
174    }
175
176    /// Run command in background.
177    async fn start(&mut self) -> Result<()> {
178        use crate::process::SpawnSessionExt;
179
180        let wdir = self.wrk_dir();
181        info!("job work direcotry: {}", wdir.display());
182
183        let mut session = tokio::process::Command::new(&self.run_file())
184            .current_dir(wdir)
185            .stdin(std::process::Stdio::piped())
186            .stdout(std::process::Stdio::piped())
187            .stderr(std::process::Stdio::piped())
188            .spawn_session()?;
189
190        let mut stdin = session
191            .child
192            .stdin
193            .take()
194            .expect("child did not have a handle to stdout");
195        let mut stdout = session
196            .child
197            .stdout
198            .take()
199            .expect("child did not have a handle to stdout");
200        let mut stderr = session
201            .child
202            .stderr
203            .take()
204            .expect("child did not have a handle to stderr");
205
206        // NOTE: suppose stdin stream is small.
207        stdin.write_all(self.job.input.as_bytes()).await;
208
209        // redirect stdout and stderr to files for user inspection.
210        let mut fout = tokio::fs::File::create(self.out_file()).await?;
211        let mut ferr = tokio::fs::File::create(self.err_file()).await?;
212        tokio::io::copy(&mut stdout, &mut fout).await?;
213        tokio::io::copy(&mut stderr, &mut ferr).await?;
214
215        let sid = session.handler().id();
216        info!("command running in session {:?}", sid);
217        self.session = session.into();
218
219        Ok(())
220    }
221
222    /// Return true if session already has been started.
223    fn is_started(&self) -> bool {
224        self.session.is_some()
225    }
226}
227// core:1 ends here
228
229// [[file:../runners.note::*extra][extra:1]]
230impl Computation {
231    /// Return a list of full path to extra files required for computation.
232    pub fn extra_files(&self) -> Vec<PathBuf> {
233        self.job.extra_files.iter().map(|f| self.wrk_dir().join(f)).collect()
234    }
235
236    /// Check if job has been done correctly.
237    pub fn is_done(&self) -> bool {
238        let inpfile = self.inp_file();
239        let outfile = self.out_file();
240        let errfile = self.err_file();
241
242        if self.wrk_dir().is_dir() {
243            if outfile.is_file() && inpfile.is_file() {
244                if let Ok(time2) = outfile.metadata().and_then(|m| m.modified()) {
245                    if let Ok(time1) = inpfile.metadata().and_then(|m| m.modified()) {
246                        if time2 >= time1 {
247                            return true;
248                        }
249                    }
250                }
251            }
252        }
253
254        false
255    }
256
257    /// Update file timestamps to make sure `is_done` call return true.
258    pub fn fake_done(&self) {
259        todo!()
260    }
261}
262// extra:1 ends here
263
264// [[file:../runners.note::f4436dc6][f4436dc6]]
265mod db {
266    use super::*;
267
268    use bytes::Bytes;
269    use std::sync::Arc;
270    use tokio::sync::Mutex;
271
272    pub use super::impl_jobs_slotmap::Id;
273    use super::impl_jobs_slotmap::JobKey;
274    use super::impl_jobs_slotmap::Jobs;
275
276    /// A simple in-memory DB for computational jobs.
277    #[derive(Clone)]
278    pub struct Db {
279        inner: Arc<Mutex<Jobs>>,
280    }
281
282    impl Db {
283        /// Create an empty `Db`
284        pub fn new() -> Self {
285            Self {
286                inner: Arc::new(Mutex::new(Jobs::new())),
287            }
288        }
289
290        /// Update the job in `id` with a `new_job`. Return error if job `id`
291        /// has been started.
292        pub async fn update_job(&mut self, id: JobId, new_job: Job) -> Result<()> {
293            debug!("update_job: id={}, job={:?}", id, new_job);
294            let mut jobs = self.inner.lock().await;
295            let k = jobs.check_job(id)?;
296            if jobs[k].is_started() {
297                bail!("job {} has been started", id);
298            } else {
299                jobs[k] = new_job.submit();
300            }
301
302            Ok(())
303        }
304
305        /// Return a full list of submitted jobs
306        pub async fn get_job_list(&self) -> Vec<JobId> {
307            self.inner.lock().await.iter().map(|(k, _)| k).collect()
308        }
309
310        /// Put a new file on working directory of job `id`
311        pub async fn put_job_file(&mut self, id: JobId, file: String, body: Bytes) -> Result<()> {
312            debug!("put_job_file: id={}", id);
313
314            let jobs = self.inner.lock().await;
315            let id = jobs.check_job(id)?;
316
317            let job = &jobs[id];
318            let p = job.wrk_dir().join(&file);
319            info!("client request to put a file: {}", p.display());
320            match std::fs::File::create(p) {
321                Ok(mut f) => {
322                    f.write_all(&body).context("write job file")?;
323                    Ok(())
324                }
325                Err(e) => {
326                    bail!("create file error:\n{}", e);
327                }
328            }
329        }
330
331        /// Return the content of `file` for job `id`
332        pub async fn get_job_file(&self, id: JobId, file: &Path) -> Result<Vec<u8>> {
333            debug!("get_job_file: id={}", id);
334            let jobs = self.inner.lock().await;
335            let k = jobs.check_job(id)?;
336            let job = &jobs[k];
337            let p = job.wrk_dir().join(&file);
338            info!("client request file: {}", p.display());
339
340            let mut buffer = Vec::new();
341            let _ = std::fs::File::open(p)
342                .context("open file")?
343                .read_to_end(&mut buffer)
344                .context("read file")?;
345            Ok(buffer)
346        }
347
348        /// List files in working directory of Job `id`.
349        pub async fn list_job_files(&self, id: JobId) -> Result<Vec<PathBuf>> {
350            info!("list files for job {}", id);
351            let jobs = self.inner.lock().await;
352            let id = jobs.check_job(id)?;
353
354            let mut list = vec![];
355            let job = &jobs[id];
356            for entry in std::fs::read_dir(job.wrk_dir()).context("list dir")? {
357                if let Ok(entry) = entry {
358                    let p = entry.path();
359                    if p.is_file() {
360                        list.push(p);
361                    }
362                }
363            }
364            Ok(list)
365        }
366
367        /// Remove all jobs from `Db`. If the job has been started, the child
368        /// processes will be terminated.
369        pub async fn clear_jobs(&mut self) {
370            self.inner.lock().await.clear();
371        }
372
373        /// Remove the job `id` from `Db`. If the job has been started, it will
374        /// be terminated.
375        pub async fn delete_job(&mut self, id: JobId) -> Result<()> {
376            info!("delete_job: id={}", id);
377            self.inner.lock().await.remove(id)?;
378            Ok(())
379        }
380
381        /// Insert job into the queue.
382        pub async fn insert_job(&mut self, mut job: Job) -> JobId {
383            info!("create_job: {:?}", job);
384            let mut jobs = self.inner.lock().await;
385            let jid = jobs.insert(job.submit());
386            info!("Job {} created.", jid);
387            jid
388        }
389
390        /// Start the job in background, and wait until it finish.
391        pub async fn wait_job(&self, id: JobId) -> Result<()> {
392            info!("wait_job: id={}", id);
393            let mut jobs = self.inner.lock().await;
394            let k = jobs.check_job(id)?;
395            jobs[k].start().await?;
396            jobs[k].wait().await?;
397            Ok(())
398        }
399    }
400}
401// f4436dc6 ends here
402
403// [[file:../runners.note::*slotmap][slotmap:1]]
404mod impl_jobs_slotmap {
405    use super::*;
406
407    use bimap::BiMap;
408    use slotmap::Key;
409    use slotmap::{DefaultKey, SlotMap};
410
411    /// The job `Id` from user side
412    pub type Id = usize;
413
414    pub(super) type JobKey = DefaultKey;
415
416    pub struct Jobs {
417        inner: SlotMap<DefaultKey, Computation>,
418        mapping: BiMap<usize, JobKey>,
419    }
420
421    impl Jobs {
422        /// Create empty `Jobs`
423        pub fn new() -> Self {
424            Self {
425                inner: SlotMap::new(),
426                mapping: BiMap::new(),
427            }
428        }
429
430        /// Look for the Job with `id`, returning error if the job with `id`
431        /// does not exist.
432        pub fn check_job(&self, id: Id) -> Result<JobKey> {
433            if let Some(&k) = self.mapping.get_by_left(&id) {
434                Ok(k)
435            } else {
436                bail!("Job id not found: {}", id);
437            }
438        }
439
440        /// Insert a new Job into database, returning Id for later operations.
441        pub fn insert(&mut self, job: Computation) -> Id {
442            let k = self.inner.insert(job);
443            let n = self.mapping.len() + 1;
444            if let Err(e) = self.mapping.insert_no_overwrite(n, k) {
445                panic!("invalid {:?}", e);
446            }
447            n
448        }
449
450        /// Remove the job with `id`
451        pub fn remove(&mut self, id: Id) -> Result<()> {
452            let k = self.check_job(id)?;
453            let job = &self.inner[k];
454            if job.is_started() {
455                info!("Job {} has been started.", id);
456            }
457            // The session will be terminated on drop
458            let _ = self.inner.remove(k);
459            Ok(())
460        }
461
462        /// Remove all created jobs
463        pub fn clear(&mut self) {
464            for (k, job) in self.inner.iter() {
465                if job.is_started() {
466                    info!("job {} already started.", self.to_id(k));
467                }
468            }
469            // The session will be terminated on drop
470            self.inner.clear();
471        }
472
473        /// Iterator over a tuple of `Id` and `Job`.
474        pub fn iter(&self) -> impl Iterator<Item = (Id, &Computation)> {
475            self.inner.iter().map(move |(k, v)| (self.to_id(k), v))
476        }
477
478        fn to_id(&self, k: JobKey) -> Id {
479            if let Some(&id) = self.mapping.get_by_right(&k) {
480                id
481            } else {
482                panic!("invalid job key {:?}", k);
483            }
484        }
485    }
486
487    impl std::ops::Index<JobKey> for Jobs {
488        type Output = Computation;
489
490        fn index(&self, key: JobKey) -> &Self::Output {
491            &self.inner[key]
492        }
493    }
494
495    impl std::ops::IndexMut<JobKey> for Jobs {
496        fn index_mut(&mut self, key: JobKey) -> &mut Self::Output {
497            &mut self.inner[key]
498        }
499    }
500}
501// slotmap:1 ends here
502
503// [[file:../runners.note::*pub][pub:1]]
504pub use self::db::Db;
505pub use self::db::Id as JobId;
506// pub:1 ends here