1use super::*;
4
5use serde::{Deserialize, Serialize};
6use tempfile::{tempdir, tempdir_in, TempDir};
7#[derive(Debug, Deserialize, Serialize)]
12pub struct Job {
13 input: String,
15
16 script: String,
18
19 inp_file: PathBuf,
21
22 out_file: PathBuf,
24
25 err_file: PathBuf,
27
28 run_file: PathBuf,
30
31 extra_files: Vec<PathBuf>,
33}
34
35impl Job {
36 pub fn new(script: &str) -> Self {
43 Self {
44 script: script.into(),
45 input: String::new(),
46
47 out_file: "job.out".into(),
48 err_file: "job.err".into(),
49 run_file: "run".into(),
50 inp_file: "job.inp".into(),
51 extra_files: vec![],
52 }
53 }
54
55 pub fn attach_file<P: AsRef<Path>>(&mut self, file: P) {
57 let file: PathBuf = file.as_ref().into();
58 if !self.extra_files.contains(&file) {
59 self.extra_files.push(file);
60 } else {
61 warn!("try to attach a dumplicated file: {}!", file.display());
62 }
63 }
64}
65pub struct Computation {
70 job: Job,
71
72 session: Option<crate::process::Session<tokio::process::Child>>,
74
75 wrk_dir: TempDir,
77}
78impl Computation {
82 pub fn wrk_dir(&self) -> &Path {
84 self.wrk_dir.path()
85 }
86
87 pub fn inp_file(&self) -> PathBuf {
89 self.wrk_dir().join(&self.job.inp_file)
90 }
91
92 pub fn out_file(&self) -> PathBuf {
94 self.wrk_dir().join(&self.job.out_file)
95 }
96
97 pub fn err_file(&self) -> PathBuf {
99 self.wrk_dir().join(&self.job.err_file)
100 }
101
102 pub fn run_file(&self) -> PathBuf {
104 self.wrk_dir().join(&self.job.run_file)
105 }
106}
107use tokio::io::AsyncWriteExt;
111
112impl Job {
113 pub fn submit(self) -> Computation {
115 Computation::new(self)
116 }
117}
118
119impl Computation {
120 pub fn new(job: Job) -> Self {
122 use std::fs::File;
123 use std::os::unix::fs::OpenOptionsExt;
124
125 let wdir = tempfile::TempDir::new_in(".").expect("temp dir");
127 let session = Computation {
128 job,
129 wrk_dir: wdir.into(),
130 session: None,
131 };
132
133 let file = session.run_file();
135
136 match std::fs::OpenOptions::new()
138 .create(true)
139 .write(true)
140 .mode(0o770)
141 .open(&file)
142 {
143 Ok(mut f) => {
144 let _ = f.write_all(session.job.script.as_bytes());
145 trace!("script content wrote to: {}.", file.display());
146 }
147 Err(e) => {
148 panic!("Error whiling creating job run file: {}", e);
149 }
150 }
151 let file = session.inp_file();
152 match File::create(&session.inp_file()) {
153 Ok(mut f) => {
154 let _ = f.write_all(session.job.input.as_bytes());
155 trace!("input content wrote to: {}.", file.display());
156 }
157 Err(e) => {
158 panic!("Error while creating job input file: {}", e);
159 }
160 }
161
162 session
163 }
164
165 async fn wait(&mut self) -> Result<()> {
167 if let Some(s) = self.session.as_mut() {
168 let ecode = s.child.wait().await?;
169 info!("job session exited: {}", ecode);
170 } else {
171 error!("Job not started yet.");
172 }
173 Ok(())
174 }
175
176 async fn start(&mut self) -> Result<()> {
178 use crate::process::SpawnSessionExt;
179
180 let wdir = self.wrk_dir();
181 info!("job work direcotry: {}", wdir.display());
182
183 let mut session = tokio::process::Command::new(&self.run_file())
184 .current_dir(wdir)
185 .stdin(std::process::Stdio::piped())
186 .stdout(std::process::Stdio::piped())
187 .stderr(std::process::Stdio::piped())
188 .spawn_session()?;
189
190 let mut stdin = session
191 .child
192 .stdin
193 .take()
194 .expect("child did not have a handle to stdout");
195 let mut stdout = session
196 .child
197 .stdout
198 .take()
199 .expect("child did not have a handle to stdout");
200 let mut stderr = session
201 .child
202 .stderr
203 .take()
204 .expect("child did not have a handle to stderr");
205
206 stdin.write_all(self.job.input.as_bytes()).await;
208
209 let mut fout = tokio::fs::File::create(self.out_file()).await?;
211 let mut ferr = tokio::fs::File::create(self.err_file()).await?;
212 tokio::io::copy(&mut stdout, &mut fout).await?;
213 tokio::io::copy(&mut stderr, &mut ferr).await?;
214
215 let sid = session.handler().id();
216 info!("command running in session {:?}", sid);
217 self.session = session.into();
218
219 Ok(())
220 }
221
222 fn is_started(&self) -> bool {
224 self.session.is_some()
225 }
226}
227impl Computation {
231 pub fn extra_files(&self) -> Vec<PathBuf> {
233 self.job.extra_files.iter().map(|f| self.wrk_dir().join(f)).collect()
234 }
235
236 pub fn is_done(&self) -> bool {
238 let inpfile = self.inp_file();
239 let outfile = self.out_file();
240 let errfile = self.err_file();
241
242 if self.wrk_dir().is_dir() {
243 if outfile.is_file() && inpfile.is_file() {
244 if let Ok(time2) = outfile.metadata().and_then(|m| m.modified()) {
245 if let Ok(time1) = inpfile.metadata().and_then(|m| m.modified()) {
246 if time2 >= time1 {
247 return true;
248 }
249 }
250 }
251 }
252 }
253
254 false
255 }
256
257 pub fn fake_done(&self) {
259 todo!()
260 }
261}
262mod db {
266 use super::*;
267
268 use bytes::Bytes;
269 use std::sync::Arc;
270 use tokio::sync::Mutex;
271
272 pub use super::impl_jobs_slotmap::Id;
273 use super::impl_jobs_slotmap::JobKey;
274 use super::impl_jobs_slotmap::Jobs;
275
276 #[derive(Clone)]
278 pub struct Db {
279 inner: Arc<Mutex<Jobs>>,
280 }
281
282 impl Db {
283 pub fn new() -> Self {
285 Self {
286 inner: Arc::new(Mutex::new(Jobs::new())),
287 }
288 }
289
290 pub async fn update_job(&mut self, id: JobId, new_job: Job) -> Result<()> {
293 debug!("update_job: id={}, job={:?}", id, new_job);
294 let mut jobs = self.inner.lock().await;
295 let k = jobs.check_job(id)?;
296 if jobs[k].is_started() {
297 bail!("job {} has been started", id);
298 } else {
299 jobs[k] = new_job.submit();
300 }
301
302 Ok(())
303 }
304
305 pub async fn get_job_list(&self) -> Vec<JobId> {
307 self.inner.lock().await.iter().map(|(k, _)| k).collect()
308 }
309
310 pub async fn put_job_file(&mut self, id: JobId, file: String, body: Bytes) -> Result<()> {
312 debug!("put_job_file: id={}", id);
313
314 let jobs = self.inner.lock().await;
315 let id = jobs.check_job(id)?;
316
317 let job = &jobs[id];
318 let p = job.wrk_dir().join(&file);
319 info!("client request to put a file: {}", p.display());
320 match std::fs::File::create(p) {
321 Ok(mut f) => {
322 f.write_all(&body).context("write job file")?;
323 Ok(())
324 }
325 Err(e) => {
326 bail!("create file error:\n{}", e);
327 }
328 }
329 }
330
331 pub async fn get_job_file(&self, id: JobId, file: &Path) -> Result<Vec<u8>> {
333 debug!("get_job_file: id={}", id);
334 let jobs = self.inner.lock().await;
335 let k = jobs.check_job(id)?;
336 let job = &jobs[k];
337 let p = job.wrk_dir().join(&file);
338 info!("client request file: {}", p.display());
339
340 let mut buffer = Vec::new();
341 let _ = std::fs::File::open(p)
342 .context("open file")?
343 .read_to_end(&mut buffer)
344 .context("read file")?;
345 Ok(buffer)
346 }
347
348 pub async fn list_job_files(&self, id: JobId) -> Result<Vec<PathBuf>> {
350 info!("list files for job {}", id);
351 let jobs = self.inner.lock().await;
352 let id = jobs.check_job(id)?;
353
354 let mut list = vec![];
355 let job = &jobs[id];
356 for entry in std::fs::read_dir(job.wrk_dir()).context("list dir")? {
357 if let Ok(entry) = entry {
358 let p = entry.path();
359 if p.is_file() {
360 list.push(p);
361 }
362 }
363 }
364 Ok(list)
365 }
366
367 pub async fn clear_jobs(&mut self) {
370 self.inner.lock().await.clear();
371 }
372
373 pub async fn delete_job(&mut self, id: JobId) -> Result<()> {
376 info!("delete_job: id={}", id);
377 self.inner.lock().await.remove(id)?;
378 Ok(())
379 }
380
381 pub async fn insert_job(&mut self, mut job: Job) -> JobId {
383 info!("create_job: {:?}", job);
384 let mut jobs = self.inner.lock().await;
385 let jid = jobs.insert(job.submit());
386 info!("Job {} created.", jid);
387 jid
388 }
389
390 pub async fn wait_job(&self, id: JobId) -> Result<()> {
392 info!("wait_job: id={}", id);
393 let mut jobs = self.inner.lock().await;
394 let k = jobs.check_job(id)?;
395 jobs[k].start().await?;
396 jobs[k].wait().await?;
397 Ok(())
398 }
399 }
400}
401mod impl_jobs_slotmap {
405 use super::*;
406
407 use bimap::BiMap;
408 use slotmap::Key;
409 use slotmap::{DefaultKey, SlotMap};
410
411 pub type Id = usize;
413
414 pub(super) type JobKey = DefaultKey;
415
416 pub struct Jobs {
417 inner: SlotMap<DefaultKey, Computation>,
418 mapping: BiMap<usize, JobKey>,
419 }
420
421 impl Jobs {
422 pub fn new() -> Self {
424 Self {
425 inner: SlotMap::new(),
426 mapping: BiMap::new(),
427 }
428 }
429
430 pub fn check_job(&self, id: Id) -> Result<JobKey> {
433 if let Some(&k) = self.mapping.get_by_left(&id) {
434 Ok(k)
435 } else {
436 bail!("Job id not found: {}", id);
437 }
438 }
439
440 pub fn insert(&mut self, job: Computation) -> Id {
442 let k = self.inner.insert(job);
443 let n = self.mapping.len() + 1;
444 if let Err(e) = self.mapping.insert_no_overwrite(n, k) {
445 panic!("invalid {:?}", e);
446 }
447 n
448 }
449
450 pub fn remove(&mut self, id: Id) -> Result<()> {
452 let k = self.check_job(id)?;
453 let job = &self.inner[k];
454 if job.is_started() {
455 info!("Job {} has been started.", id);
456 }
457 let _ = self.inner.remove(k);
459 Ok(())
460 }
461
462 pub fn clear(&mut self) {
464 for (k, job) in self.inner.iter() {
465 if job.is_started() {
466 info!("job {} already started.", self.to_id(k));
467 }
468 }
469 self.inner.clear();
471 }
472
473 pub fn iter(&self) -> impl Iterator<Item = (Id, &Computation)> {
475 self.inner.iter().map(move |(k, v)| (self.to_id(k), v))
476 }
477
478 fn to_id(&self, k: JobKey) -> Id {
479 if let Some(&id) = self.mapping.get_by_right(&k) {
480 id
481 } else {
482 panic!("invalid job key {:?}", k);
483 }
484 }
485 }
486
487 impl std::ops::Index<JobKey> for Jobs {
488 type Output = Computation;
489
490 fn index(&self, key: JobKey) -> &Self::Output {
491 &self.inner[key]
492 }
493 }
494
495 impl std::ops::IndexMut<JobKey> for Jobs {
496 fn index_mut(&mut self, key: JobKey) -> &mut Self::Output {
497 &mut self.inner[key]
498 }
499 }
500}
501pub use self::db::Db;
505pub use self::db::Id as JobId;
506