gosh_remote/jobhub.rs
1// [[file:../remote.note::c67d342c][c67d342c]]
2use crate::common::*;
3use crate::Client;
4
5use gchemol::Molecule;
6use gosh_model::Computed;
7
8use std::path::{Path, PathBuf};
9// c67d342c ends here
10
11// [[file:../remote.note::a3bb4770][a3bb4770]]
12/// A job hub for parallel running of multiple jobs over remote
13/// computational nodes
14pub struct JobHub {
15 // the client for sending requests
16 client: Client,
17 // the molcules to be computed
18 jobs: Vec<Molecule>,
19 // the computed results
20 results: Vec<Result<Computed>>,
21}
22// a3bb4770 ends here
23
24// [[file:../remote.note::b2b2f089][b2b2f089]]
25use gut::rayon;
26
27impl JobHub {
28 /// Create a job hub for background scheduler specified in
29 /// `scheduler_address`.
30 pub fn new(scheduler_address: &str) -> Self {
31 let client = Client::connect(&scheduler_address);
32 Self {
33 client,
34 jobs: vec![],
35 results: vec![],
36 }
37 }
38
39 /// Return the number of parallel threads.
40 pub fn num_threads() -> usize {
41 rayon::current_num_threads()
42 }
43
44 // FIXME: do not work, as already set
45 // /// Set up number of threads for parallel run.
46 // pub fn set_num_threads(n: usize) -> Result<()> {
47 // rayon::ThreadPoolBuilder::new().num_threads(n).build_global()?;
48 // Ok(())
49 // }
50}
51// b2b2f089 ends here
52
53// [[file:../remote.note::c3d41589][c3d41589]]
54impl JobHub {
55 /// Add a new mol into job hub for later computation. Return associated
56 /// jobid which can be used to retrive computation result.
57 pub fn add_job(&mut self, mol: Molecule) -> usize {
58 self.jobs.push(mol);
59 self.jobs.len() - 1
60 }
61
62 /// Return the numbrer of pending jobs.
63 pub fn njobs(&self) -> usize {
64 self.jobs.len()
65 }
66
67 /// Run all scheduled jobs with nodes in pool. Call this method
68 /// will overwrite computed results and clear pending jobs.
69 pub fn run(&mut self) -> Result<()> {
70 self.results = self
71 .jobs
72 .par_iter()
73 .map(|mol| self.client.compute_molecule_blockly(mol))
74 .collect();
75
76 // clear pending jobs
77 self.jobs.clear();
78
79 Ok(())
80 }
81
82 /// Return computed result for job `jobid`.
83 pub fn get_computed(&mut self, jobid: usize) -> Result<Computed> {
84 let computed = self.results.get(jobid).ok_or(anyhow!("no such job {jobid}"))?;
85 match computed {
86 Err(e) => bail!("job {jobid} failed with error: {e:?}"),
87 Ok(r) => Ok(r.to_owned()),
88 }
89 }
90}
91// c3d41589 ends here