1use super::*;
3#[derive(Debug, Clone, Deserialize, Serialize)]
8#[serde(default)]
9pub struct Job {
10 pub(crate) script: String,
13
14 name: String,
16
17 pub out_file: PathBuf,
19
20 pub err_file: PathBuf,
22
23 pub run_file: PathBuf,
25}
26
27impl Default for Job {
28 fn default() -> Self {
29 Self {
30 script: "pwd".into(),
31 name: random_name(),
32 out_file: "job.out".into(),
33 err_file: "job.err".into(),
34 run_file: "run".into(),
35 }
36 }
37}
38
39impl Job {
40 pub fn new(script: impl Into<String>) -> Self {
47 Self {
48 script: script.into(),
49 ..Default::default()
50 }
51 }
52
53 pub fn with_name(mut self, name: &str) -> Self {
55 self.name = name.into();
56 self
57 }
58
59 pub fn name(&self) -> String {
61 self.name.clone()
62 }
63}
64
65fn random_name() -> String {
66 use rand::distributions::Alphanumeric;
67 use rand::Rng;
68
69 let mut rng = rand::thread_rng();
70 std::iter::repeat(())
71 .map(|()| rng.sample(Alphanumeric))
72 .map(char::from)
73 .take(6)
74 .collect()
75}
76mod node {
80 use super::*;
81 use crossbeam_channel::{unbounded, Receiver, Sender};
82
83 #[derive(Debug, Clone, Deserialize, Serialize)]
85 pub struct Node {
86 name: String,
87 }
88
89 impl Node {
90 pub fn name(&self) -> &str {
92 &self.name
93 }
94 }
95
96 impl<T: Into<String>> From<T> for Node {
97 fn from(node: T) -> Self {
98 let name = node.into();
99 assert!(!name.is_empty(), "node name cannot be empty!");
100 Self { name }
101 }
102 }
103
104 impl std::fmt::Display for Node {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 let name = &self.name;
107 write!(f, "{name}")
108 }
109 }
110
111 #[derive(Clone)]
113 pub struct Nodes {
114 rx: Receiver<Node>,
115 tx: Sender<Node>,
116 }
117
118 impl Nodes {
119 pub fn new<T: Into<Node>>(nodes: impl IntoIterator<Item = T>) -> Self {
121 let (tx, rx) = unbounded();
122 let nodes = nodes.into_iter().collect_vec();
123 let n = nodes.len();
124 info!("We have {n} nodes in totoal for computation.");
125 for node in nodes {
126 tx.send(node.into()).unwrap();
127 }
128 Self { rx, tx }
129 }
130
131 pub fn len(&self) -> usize {
133 self.rx.len()
134 }
135
136 pub fn borrow_node(&self) -> Result<Node> {
138 let node = self.rx.recv()?;
139 let name = &node.name;
140 info!("client borrowed one node: {name:?}");
141 Ok(node)
142 }
143
144 pub fn return_node(&self, node: Node) -> Result<()> {
146 let name = &node.name;
147 info!("client returned node {name:?}");
148 self.tx.send(node)?;
149 Ok(())
150 }
151 }
152}
153use std::path::{Path, PathBuf};
157
158use gosh_runner::prelude::SpawnSessionExt;
159use gosh_runner::process::Session;
160
161use tempfile::TempDir;
162pub struct Computation {
167 job: Job,
168
169 session: Option<Session<tokio::process::Child>>,
171
172 wrk_dir: TempDir,
174}
175impl Computation {
179 fn wrk_dir(&self) -> &Path {
181 self.wrk_dir.path()
182 }
183
184 fn out_file(&self) -> PathBuf {
186 self.wrk_dir().join(&self.job.out_file)
187 }
188
189 fn err_file(&self) -> PathBuf {
191 self.wrk_dir().join(&self.job.err_file)
192 }
193
194 fn run_file(&self) -> PathBuf {
196 self.wrk_dir().join(&self.job.run_file)
197 }
198}
199impl Job {
203 pub fn submit(self) -> Result<Computation> {
205 Computation::try_run(self)
206 }
207}
208
209impl Computation {
210 fn create_run_file(&self) -> Result<()> {
212 let run_file = &self.run_file();
213 gut::fs::write_script_file(run_file, &self.job.script)?;
214 LockFile::wait(&run_file, 2.0)?;
215
216 Ok(())
217 }
218
219 fn try_run(job: Job) -> Result<Self> {
221 let wdir = tempfile::TempDir::new_in(".").expect("temp dir");
223 let session = Self {
224 job,
225 wrk_dir: wdir.into(),
226 session: None,
227 };
228
229 session.create_run_file()?;
230
231 Ok(session)
232 }
233
234 async fn wait(&mut self) -> Result<()> {
236 if let Some(s) = self.session.as_mut() {
237 let ecode = s.child.wait().await?;
238 info!("job session exited: {}", ecode);
239 if !ecode.success() {
240 error!("job exited unsuccessfully!");
241 let txt = gut::fs::read_file(self.run_file())?;
242 let run = format!("run file: {txt:?}");
243 let txt = gut::fs::read_file(self.err_file())?;
244 let err = format!("stderr: {txt:?}");
245 bail!("Job failed with error:\n{run:?}{err:?}");
246 }
247 Ok(())
248 } else {
249 bail!("Job not started yet.");
250 }
251 }
252
253 async fn start(&mut self) -> Result<()> {
255 let program = self.run_file();
256 let wdir = self.wrk_dir();
257 trace!("job work direcotry: {}", wdir.display());
258
259 let mut session = tokio::process::Command::new(&program)
260 .current_dir(wdir)
261 .stdout(std::process::Stdio::piped())
262 .stderr(std::process::Stdio::piped())
263 .spawn_session()?;
264
265 let mut stdout = session
266 .child
267 .stdout
268 .take()
269 .expect("child did not have a handle to stdout");
270 let mut stderr = session
271 .child
272 .stderr
273 .take()
274 .expect("child did not have a handle to stderr");
275
276 let mut fout = tokio::fs::File::create(self.out_file()).await?;
278 let mut ferr = tokio::fs::File::create(self.err_file()).await?;
279 tokio::io::copy(&mut stdout, &mut fout).await?;
280 tokio::io::copy(&mut stderr, &mut ferr).await?;
281
282 let sid = session.handler().id();
283 debug!("command running in session {:?}", sid);
284 self.session = session.into();
285
286 Ok(())
287 }
288
289 pub async fn wait_for_output(&mut self) -> Result<String> {
291 self.start().await?;
292 self.wait().await?;
293 let txt = gut::fs::read_file(self.out_file())?;
294 Ok(txt)
295 }
296}
297#[derive(Debug)]
302pub struct LockFile {
303 file: std::fs::File,
304 path: PathBuf,
305}
306
307impl LockFile {
308 fn create(path: &Path) -> Result<LockFile> {
309 use fs2::*;
310
311 let file = std::fs::OpenOptions::new()
312 .create(true)
313 .write(true)
314 .open(&path)
315 .context("Could not create ID file")?;
316
317 file.try_lock_exclusive()
319 .context("Could not lock lock file; Is the instance already running?")?;
320
321 Ok(LockFile {
322 file,
323 path: path.to_owned(),
324 })
325 }
326
327 fn write_msg(&mut self, msg: &str) -> Result<()> {
328 writeln!(&mut self.file, "{msg}").context("Could not write lock file")?;
329 self.file.flush().context("Could not flush lock file")
330 }
331
332 pub fn new(path: &Path, msg: impl std::fmt::Display) -> Result<Self> {
334 let mut lockfile = Self::create(path)?;
335 lockfile.write_msg(&msg.to_string())?;
336 Ok(lockfile)
337 }
338
339 pub fn wait(f: &std::path::Path, timeout: f64) -> Result<()> {
345 use gut::utils::sleep;
346
347 let interval = 0.1;
349 let mut t = 0.0;
350 loop {
351 if f.exists() {
352 trace!("Elapsed time during waiting: {:.2} seconds ", t);
353 return Ok(());
354 }
355 t += interval;
356 sleep(interval);
357
358 if t > timeout {
359 bail!("file {:?} doest exist for {} seconds", f, timeout);
360 }
361 }
362 }
363}
364
365impl Drop for LockFile {
366 fn drop(&mut self) {
367 let _ = std::fs::remove_file(&self.path);
368 }
369}
370pub use node::{Node, Nodes};
374#[test]
378fn test_node() {
379 let localhost: Node = "localhost".into();
380 assert_eq!(localhost.name(), "localhost");
381}
382