gosh_remote/
jobhub.rs

1// [[file:../remote.note::c67d342c][c67d342c]]
2use crate::common::*;
3use crate::Client;
4
5use gchemol::Molecule;
6use gosh_model::Computed;
7
8use std::path::{Path, PathBuf};
9// c67d342c ends here
10
11// [[file:../remote.note::a3bb4770][a3bb4770]]
12/// A job hub for parallel running of multiple jobs over remote
13/// computational nodes
14pub struct JobHub {
15    // the client for sending requests
16    client: Client,
17    // the molcules to be computed
18    jobs: Vec<Molecule>,
19    // the computed results
20    results: Vec<Result<Computed>>,
21}
22// a3bb4770 ends here
23
24// [[file:../remote.note::b2b2f089][b2b2f089]]
25use gut::rayon;
26
27impl JobHub {
28    /// Create a job hub for background scheduler specified in
29    /// `scheduler_address`.
30    pub fn new(scheduler_address: &str) -> Self {
31        let client = Client::connect(&scheduler_address);
32        Self {
33            client,
34            jobs: vec![],
35            results: vec![],
36        }
37    }
38
39    /// Return the number of parallel threads.
40    pub fn num_threads() -> usize {
41        rayon::current_num_threads()
42    }
43
44    // FIXME: do not work, as already set
45    // /// Set up number of threads for parallel run.
46    // pub fn set_num_threads(n: usize) -> Result<()> {
47    //     rayon::ThreadPoolBuilder::new().num_threads(n).build_global()?;
48    //     Ok(())
49    // }
50}
51// b2b2f089 ends here
52
53// [[file:../remote.note::c3d41589][c3d41589]]
54impl JobHub {
55    /// Add a new mol into job hub for later computation. Return associated
56    /// jobid which can be used to retrive computation result.
57    pub fn add_job(&mut self, mol: Molecule) -> usize {
58        self.jobs.push(mol);
59        self.jobs.len() - 1
60    }
61
62    /// Return the numbrer of pending jobs.
63    pub fn njobs(&self) -> usize {
64        self.jobs.len()
65    }
66
67    /// Run all scheduled jobs with nodes in pool. Call this method
68    /// will overwrite computed results and clear pending jobs.
69    pub fn run(&mut self) -> Result<()> {
70        self.results = self
71            .jobs
72            .par_iter()
73            .map(|mol| self.client.compute_molecule_blockly(mol))
74            .collect();
75
76        // clear pending jobs
77        self.jobs.clear();
78
79        Ok(())
80    }
81
82    /// Return computed result for job `jobid`.
83    pub fn get_computed(&mut self, jobid: usize) -> Result<Computed> {
84        let computed = self.results.get(jobid).ok_or(anyhow!("no such job {jobid}"))?;
85        match computed {
86            Err(e) => bail!("job {jobid} failed with error: {e:?}"),
87            Ok(r) => Ok(r.to_owned()),
88        }
89    }
90}
91// c3d41589 ends here