gosh_remote/
base.rs

1// [[file:../remote.note::fed8a9d3][fed8a9d3]]
2use super::*;
3// fed8a9d3 ends here
4
5// [[file:../remote.note::50e6ed5a][50e6ed5a]]
6/// Represents a computational job inputted by user.
7#[derive(Debug, Clone, Deserialize, Serialize)]
8#[serde(default)]
9pub struct Job {
10    // FIXME: remove pub
11    /// The content of running script
12    pub(crate) script: String,
13
14    /// A unique random name
15    name: String,
16
17    /// Path to a file for saving output stream of computation.
18    pub out_file: PathBuf,
19
20    /// Path to a file for saving error stream of computation.
21    pub err_file: PathBuf,
22
23    /// Path to a script file that defining how to start computation
24    pub run_file: PathBuf,
25}
26
27impl Default for Job {
28    fn default() -> Self {
29        Self {
30            script: "pwd".into(),
31            name: random_name(),
32            out_file: "job.out".into(),
33            err_file: "job.err".into(),
34            run_file: "run".into(),
35        }
36    }
37}
38
39impl Job {
40    /// Construct a Job running shell script.
41    ///
42    /// # Parameters
43    ///
44    /// * script: the content of the script for running the job.
45    ///
46    pub fn new(script: impl Into<String>) -> Self {
47        Self {
48            script: script.into(),
49            ..Default::default()
50        }
51    }
52
53    /// Set job name.
54    pub fn with_name(mut self, name: &str) -> Self {
55        self.name = name.into();
56        self
57    }
58
59    /// Return the job name
60    pub fn name(&self) -> String {
61        self.name.clone()
62    }
63}
64
65fn random_name() -> String {
66    use rand::distributions::Alphanumeric;
67    use rand::Rng;
68
69    let mut rng = rand::thread_rng();
70    std::iter::repeat(())
71        .map(|()| rng.sample(Alphanumeric))
72        .map(char::from)
73        .take(6)
74        .collect()
75}
76// 50e6ed5a ends here
77
78// [[file:../remote.note::769262a8][769262a8]]
79mod node {
80    use super::*;
81    use crossbeam_channel::{unbounded, Receiver, Sender};
82
83    /// Represents a remote node for computation
84    #[derive(Debug, Clone, Deserialize, Serialize)]
85    pub struct Node {
86        name: String,
87    }
88
89    impl Node {
90        /// Return the name of remote node
91        pub fn name(&self) -> &str {
92            &self.name
93        }
94    }
95
96    impl<T: Into<String>> From<T> for Node {
97        fn from(node: T) -> Self {
98            let name = node.into();
99            assert!(!name.is_empty(), "node name cannot be empty!");
100            Self { name }
101        }
102    }
103
104    impl std::fmt::Display for Node {
105        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106            let name = &self.name;
107            write!(f, "{name}")
108        }
109    }
110
111    /// Represents a list of remote nodes allocated for computation
112    #[derive(Clone)]
113    pub struct Nodes {
114        rx: Receiver<Node>,
115        tx: Sender<Node>,
116    }
117
118    impl Nodes {
119        /// Construct `Nodes` from a list of nodes.
120        pub fn new<T: Into<Node>>(nodes: impl IntoIterator<Item = T>) -> Self {
121            let (tx, rx) = unbounded();
122            let nodes = nodes.into_iter().collect_vec();
123            let n = nodes.len();
124            info!("We have {n} nodes in totoal for computation.");
125            for node in nodes {
126                tx.send(node.into()).unwrap();
127            }
128            Self { rx, tx }
129        }
130
131        /// Return the number of nodes
132        pub fn len(&self) -> usize {
133            self.rx.len()
134        }
135
136        /// Borrow one node from `Nodes`
137        pub fn borrow_node(&self) -> Result<Node> {
138            let node = self.rx.recv()?;
139            let name = &node.name;
140            info!("client borrowed one node: {name:?}");
141            Ok(node)
142        }
143
144        /// Return one `node` to `Nodes`
145        pub fn return_node(&self, node: Node) -> Result<()> {
146            let name = &node.name;
147            info!("client returned node {name:?}");
148            self.tx.send(node)?;
149            Ok(())
150        }
151    }
152}
153// 769262a8 ends here
154
155// [[file:../remote.note::e19bce71][e19bce71]]
156use std::path::{Path, PathBuf};
157
158use gosh_runner::prelude::SpawnSessionExt;
159use gosh_runner::process::Session;
160
161use tempfile::TempDir;
162// e19bce71 ends here
163
164// [[file:../remote.note::955c926a][955c926a]]
165/// Computation represents a submitted `Job`
166pub struct Computation {
167    job: Job,
168
169    /// command session. The drop order is above Tempdir
170    session: Option<Session<tokio::process::Child>>,
171
172    /// The working directory of computation
173    wrk_dir: TempDir,
174}
175// 955c926a ends here
176
177// [[file:../remote.note::a65e6dae][a65e6dae]]
178impl Computation {
179    /// The full path to the working directory for running the job.
180    fn wrk_dir(&self) -> &Path {
181        self.wrk_dir.path()
182    }
183
184    /// The full path to computation output file (stdout).
185    fn out_file(&self) -> PathBuf {
186        self.wrk_dir().join(&self.job.out_file)
187    }
188
189    /// The full path to computation error file (stderr).
190    fn err_file(&self) -> PathBuf {
191        self.wrk_dir().join(&self.job.err_file)
192    }
193
194    /// The full path to the script for running the job.
195    fn run_file(&self) -> PathBuf {
196        self.wrk_dir().join(&self.job.run_file)
197    }
198}
199// a65e6dae ends here
200
201// [[file:../remote.note::f8672e0c][f8672e0c]]
202impl Job {
203    /// Submit the job and turn it into Computation.
204    pub fn submit(self) -> Result<Computation> {
205        Computation::try_run(self)
206    }
207}
208
209impl Computation {
210    /// create run file and make sure it executable later
211    fn create_run_file(&self) -> Result<()> {
212        let run_file = &self.run_file();
213        gut::fs::write_script_file(run_file, &self.job.script)?;
214        LockFile::wait(&run_file, 2.0)?;
215
216        Ok(())
217    }
218
219    /// Construct `Computation` of user inputted `Job`.
220    fn try_run(job: Job) -> Result<Self> {
221        // create working directory in scratch space.
222        let wdir = tempfile::TempDir::new_in(".").expect("temp dir");
223        let session = Self {
224            job,
225            wrk_dir: wdir.into(),
226            session: None,
227        };
228
229        session.create_run_file()?;
230
231        Ok(session)
232    }
233
234    /// Wait for background command to complete.
235    async fn wait(&mut self) -> Result<()> {
236        if let Some(s) = self.session.as_mut() {
237            let ecode = s.child.wait().await?;
238            info!("job session exited: {}", ecode);
239            if !ecode.success() {
240                error!("job exited unsuccessfully!");
241                let txt = gut::fs::read_file(self.run_file())?;
242                let run = format!("run file: {txt:?}");
243                let txt = gut::fs::read_file(self.err_file())?;
244                let err = format!("stderr: {txt:?}");
245                bail!("Job failed with error:\n{run:?}{err:?}");
246            }
247            Ok(())
248        } else {
249            bail!("Job not started yet.");
250        }
251    }
252
253    /// Run command in background.
254    async fn start(&mut self) -> Result<()> {
255        let program = self.run_file();
256        let wdir = self.wrk_dir();
257        trace!("job work direcotry: {}", wdir.display());
258
259        let mut session = tokio::process::Command::new(&program)
260            .current_dir(wdir)
261            .stdout(std::process::Stdio::piped())
262            .stderr(std::process::Stdio::piped())
263            .spawn_session()?;
264
265        let mut stdout = session
266            .child
267            .stdout
268            .take()
269            .expect("child did not have a handle to stdout");
270        let mut stderr = session
271            .child
272            .stderr
273            .take()
274            .expect("child did not have a handle to stderr");
275
276        // redirect stdout and stderr to files for user inspection.
277        let mut fout = tokio::fs::File::create(self.out_file()).await?;
278        let mut ferr = tokio::fs::File::create(self.err_file()).await?;
279        tokio::io::copy(&mut stdout, &mut fout).await?;
280        tokio::io::copy(&mut stderr, &mut ferr).await?;
281
282        let sid = session.handler().id();
283        debug!("command running in session {:?}", sid);
284        self.session = session.into();
285
286        Ok(())
287    }
288
289    /// Start computation, and wait and return its standard output
290    pub async fn wait_for_output(&mut self) -> Result<String> {
291        self.start().await?;
292        self.wait().await?;
293        let txt = gut::fs::read_file(self.out_file())?;
294        Ok(txt)
295    }
296}
297// f8672e0c ends here
298
299// [[file:../remote.note::9b7911ae][9b7911ae]]
300/// A singleton pattern based on file locking
301#[derive(Debug)]
302pub struct LockFile {
303    file: std::fs::File,
304    path: PathBuf,
305}
306
307impl LockFile {
308    fn create(path: &Path) -> Result<LockFile> {
309        use fs2::*;
310
311        let file = std::fs::OpenOptions::new()
312            .create(true)
313            .write(true)
314            .open(&path)
315            .context("Could not create ID file")?;
316
317        // https://docs.rs/fs2/0.4.3/fs2/trait.FileExt.html
318        file.try_lock_exclusive()
319            .context("Could not lock lock file; Is the instance already running?")?;
320
321        Ok(LockFile {
322            file,
323            path: path.to_owned(),
324        })
325    }
326
327    fn write_msg(&mut self, msg: &str) -> Result<()> {
328        writeln!(&mut self.file, "{msg}").context("Could not write lock file")?;
329        self.file.flush().context("Could not flush lock file")
330    }
331
332    /// Create a lockfile contains text `msg`
333    pub fn new(path: &Path, msg: impl std::fmt::Display) -> Result<Self> {
334        let mut lockfile = Self::create(path)?;
335        lockfile.write_msg(&msg.to_string())?;
336        Ok(lockfile)
337    }
338
339    /// Wait until file `f` available for max time of `timeout`.
340    ///
341    /// # Parameters
342    /// * timeout: timeout in seconds
343    /// * f: the file to wait for available
344    pub fn wait(f: &std::path::Path, timeout: f64) -> Result<()> {
345        use gut::utils::sleep;
346
347        // wait a moment for socke file ready
348        let interval = 0.1;
349        let mut t = 0.0;
350        loop {
351            if f.exists() {
352                trace!("Elapsed time during waiting: {:.2} seconds ", t);
353                return Ok(());
354            }
355            t += interval;
356            sleep(interval);
357
358            if t > timeout {
359                bail!("file {:?} doest exist for {} seconds", f, timeout);
360            }
361        }
362    }
363}
364
365impl Drop for LockFile {
366    fn drop(&mut self) {
367        let _ = std::fs::remove_file(&self.path);
368    }
369}
370// 9b7911ae ends here
371
372// [[file:../remote.note::4a28f1b7][4a28f1b7]]
373pub use node::{Node, Nodes};
374// 4a28f1b7 ends here
375
376// [[file:../remote.note::f725ca9b][f725ca9b]]
377#[test]
378fn test_node() {
379    let localhost: Node = "localhost".into();
380    assert_eq!(localhost.name(), "localhost");
381}
382// f725ca9b ends here