1use indicatif::ProgressBar;
5use log::{debug, trace, warn};
6use serde_json::Value;
7use std::cmp::Ordering;
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::time::Duration;
11
12use crate::cluster::{self, SchedulerType};
13use crate::expr;
14use crate::launcher;
15use crate::progress_styles;
16use crate::scheduler::bash::Bash;
17use crate::scheduler::slurm::Slurm;
18use crate::scheduler::Scheduler;
19use crate::state::State;
20use crate::workflow::{Action, Selector, Workflow};
21use crate::{Error, MultiProgressContainer};
22
23pub struct Project {
38 workflow: Workflow,
40
41 state: State,
43
44 scheduler: Box<dyn Scheduler>,
46
47 cluster_name: String,
49}
50
51#[derive(Debug)]
56pub struct Status {
57 pub completed: Vec<PathBuf>,
59
60 pub submitted: Vec<PathBuf>,
62
63 pub eligible: Vec<PathBuf>,
65
66 pub waiting: Vec<PathBuf>,
68}
69
70impl Project {
71 pub fn open(
77 io_threads: u16,
78 cluster_name: &Option<String>,
79 multi_progress: &mut MultiProgressContainer,
80 ) -> Result<Project, Error> {
81 trace!("Opening project.");
82 let workflow = Workflow::open()?;
83 let clusters = cluster::Configuration::open()?;
84 let cluster = clusters.identify(cluster_name.as_deref())?;
85 let launchers = launcher::Configuration::open()?.by_cluster(&cluster.name);
86 let cluster_name = cluster.name.clone();
87
88 let scheduler: Box<dyn Scheduler> = match cluster.scheduler {
89 SchedulerType::Bash => Box::new(Bash::new(cluster, launchers)),
90 SchedulerType::Slurm => Box::new(Slurm::new(cluster, launchers)),
91 };
92
93 let mut state = State::from_cache(&workflow)?;
94
95 let jobs = state.jobs_submitted_on(&cluster_name);
97 let mut progress =
98 ProgressBar::new_spinner().with_message("Checking submitted job statuses");
99 progress = multi_progress.add_or_hide(progress, jobs.is_empty());
100
101 progress.enable_steady_tick(Duration::from_millis(progress_styles::STEADY_TICK));
102 progress.set_style(progress_styles::uncounted_spinner());
103 progress.tick();
104
105 let active_jobs = scheduler.active_jobs(&jobs)?;
106
107 state.synchronize_workspace(&workflow, io_threads, multi_progress)?;
109
110 let active_jobs = active_jobs.get()?;
112 progress.finish();
113
114 if active_jobs.len() != jobs.len() {
115 state.remove_inactive_submitted(&cluster_name, &active_jobs);
116 } else if !jobs.is_empty() {
117 trace!("All submitted jobs remain active on {cluster_name}.");
118 }
119
120 Ok(Self {
121 workflow,
122 state,
123 scheduler,
124 cluster_name,
125 })
126 }
127
128 pub fn close(&mut self, multi_progress: &mut MultiProgressContainer) -> Result<(), Error> {
139 debug!("Closing project.");
140
141 self.state.save_cache(&self.workflow, multi_progress)?;
142
143 Ok(())
144 }
145
146 pub fn workflow(&self) -> &Workflow {
148 &self.workflow
149 }
150
151 pub fn state(&self) -> &State {
153 &self.state
154 }
155
156 pub fn cluster_name(&self) -> &String {
158 &self.cluster_name
159 }
160
161 pub fn find_matching_directories(
179 &self,
180 action: &Action,
181 directories: Vec<PathBuf>,
182 ) -> Result<Vec<PathBuf>, Error> {
183 trace!(
184 "Finding directories that action '{}' includes.",
185 action.name()
186 );
187
188 let mut matching_directories = Vec::with_capacity(directories.len());
189
190 'outer: for name in directories {
191 if let Some(value) = self.state.values().get(&name) {
192 if action.group.include().is_empty() {
193 matching_directories.push(name);
194 } else {
195 for selector in action.group.include() {
196 let result = match selector {
197 Selector::Condition((include, comparison, expected)) => {
198 let actual = value.pointer(include).ok_or_else(|| {
199 Error::JSONPointerNotFound(name.clone(), include.clone())
200 })?;
201
202 expr::evaluate_json_comparison(comparison, actual, expected)
203 .ok_or_else(|| {
204 Error::CannotCompareInclude(
205 actual.clone(),
206 expected.clone(),
207 name.clone(),
208 )
209 })
210 }
211
212 Selector::All(conditions) => {
213 let mut matches = 0;
214 for (include, comparison, expected) in conditions {
215 let actual = value.pointer(include).ok_or_else(|| {
216 Error::JSONPointerNotFound(name.clone(), include.clone())
217 })?;
218
219 if !expr::evaluate_json_comparison(comparison, actual, expected)
220 .ok_or_else(|| {
221 Error::CannotCompareInclude(
222 actual.clone(),
223 expected.clone(),
224 name.clone(),
225 )
226 })?
227 {
228 break;
229 }
230 matches += 1;
231 }
232 Ok(matches == conditions.len())
233 }
234 };
235
236 if result? {
237 matching_directories.push(name);
238 continue 'outer;
239 }
240 }
241 }
242 } else {
243 warn!("Directory '{}' not found in workspace.", name.display());
244 }
245 }
246
247 trace!("Found {} match(es).", matching_directories.len());
248 Ok(matching_directories)
249 }
250
251 pub fn separate_by_status(
264 &self,
265 action: &Action,
266 directories: Vec<PathBuf>,
267 ) -> Result<Status, Error> {
268 trace!(
269 "Separating {} directories by status for '{}'.",
270 directories.len(),
271 action.name()
272 );
273 let capacity = directories.capacity();
274 let mut status = Status {
275 completed: Vec::with_capacity(capacity),
276 submitted: Vec::with_capacity(capacity),
277 eligible: Vec::with_capacity(capacity),
278 waiting: Vec::with_capacity(capacity),
279 };
280
281 for directory_name in directories {
282 if !self.state.values().contains_key(&directory_name) {
283 return Err(Error::DirectoryNotFound(directory_name));
284 }
285
286 let completed = self.state.completed();
287
288 if completed[action.name()].contains(&directory_name) {
289 status.completed.push(directory_name);
290 } else if self.state.is_submitted(action.name(), &directory_name) {
291 status.submitted.push(directory_name);
292 } else if action
293 .previous_actions()
294 .iter()
295 .all(|a| completed[a].contains(&directory_name))
296 {
297 status.eligible.push(directory_name);
298 } else {
299 status.waiting.push(directory_name);
300 }
301 }
302
303 Ok(status)
304 }
305
306 pub fn separate_into_groups(
316 &self,
317 action: &Action,
318 mut directories: Vec<PathBuf>,
319 ) -> Result<Vec<Vec<PathBuf>>, Error> {
320 trace!(
321 "Separating {} directories into groups for '{}'.",
322 directories.len(),
323 action.name()
324 );
325
326 if directories.is_empty() {
327 return Ok(Vec::new());
328 }
329
330 directories.sort_unstable();
332
333 let mut sort_keys = HashMap::new();
335 for directory_name in &directories {
336 let value = self
337 .state
338 .values()
339 .get(directory_name)
340 .ok_or_else(|| Error::DirectoryNotFound(directory_name.clone()))?;
341
342 let mut sort_key = Vec::new();
343 for pointer in action.group.sort_by() {
344 let element = value.pointer(pointer).ok_or_else(|| {
345 Error::JSONPointerNotFound(directory_name.clone(), pointer.clone())
346 })?;
347 sort_key.push(element.clone());
348 }
349 sort_keys.insert(directory_name.clone(), Value::Array(sort_key));
350 }
351
352 let mut result = Vec::new();
354 if action.group.sort_by().is_empty() {
355 if action.group.reverse_sort() {
356 directories.reverse();
357 }
358 result.push(directories);
359 } else {
360 directories.sort_by(|a, b| {
361 expr::partial_cmp_json_values(&sort_keys[a], &sort_keys[b])
362 .expect("Valid JSON comparison")
363 });
364
365 if action.group.reverse_sort() {
366 directories.reverse();
367 }
368
369 #[allow(clippy::redundant_closure_for_method_calls)]
371 if action.group.split_by_sort_key() {
372 result.extend(
373 directories
374 .chunk_by(|a, b| {
375 expr::partial_cmp_json_values(&sort_keys[a], &sort_keys[b])
376 .expect("Valid JSON comparison")
377 == Ordering::Equal
378 })
379 .map(|v| v.to_vec()),
380 );
381 } else {
382 result.push(directories);
383 }
384 }
385
386 if let Some(maximum_size) = action.group.maximum_size {
387 let mut new_result = Vec::new();
388 for array in result {
389 #[allow(clippy::redundant_closure_for_method_calls)]
390 new_result.extend(array.chunks(maximum_size).map(|v| v.to_vec()));
391 }
392
393 result = new_result;
394 }
395
396 Ok(result)
397 }
398
399 pub fn scheduler(&self) -> &dyn Scheduler {
401 self.scheduler.as_ref()
402 }
403
404 pub fn add_submitted(&mut self, action_name: &str, directories: &[PathBuf], job_id: u32) {
406 self.state
407 .add_submitted(action_name, directories, &self.cluster_name, job_id);
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use assert_fs::prelude::*;
414 use assert_fs::TempDir;
415 use indicatif::{MultiProgress, ProgressDrawTarget};
416 use serde_json::Value;
417 use serial_test::serial;
418 use std::env;
419
420 use super::*;
421 use crate::workflow::Comparison;
422
423 fn setup(n: usize) -> Project {
424 let _ = env_logger::builder()
425 .filter_level(log::LevelFilter::max())
426 .is_test(true)
427 .try_init();
428
429 let multi_progress = MultiProgress::with_draw_target(ProgressDrawTarget::hidden());
430 let mut multi_progress = MultiProgressContainer {
431 progress_bars: Vec::new(),
432 multi_progress,
433 };
434
435 let temp = TempDir::new().unwrap();
436 env::set_current_dir(temp.path()).unwrap();
437 for i in 0..n {
438 let directory = temp.child("workspace").child(format!("dir{i}"));
439 directory.create_dir_all().unwrap();
440 directory
441 .child("v")
442 .write_str(&format!(r#"{{"i": {}, "j": {}}}"#, i, (n - 1 - i) / 2))
443 .unwrap();
444
445 if i < n / 2 {
446 directory.child("two").touch().unwrap();
447 }
448 directory.child("one").touch().unwrap();
449 }
450
451 let workflow = format!(
452 r#"
453[workspace]
454value_file = "v"
455
456[[action]]
457name = "one"
458command = "c"
459products = ["one"]
460
461[[action]]
462name = "two"
463command = "c"
464products = ["two"]
465[[action.group.include]]
466condition = ["/i", "<", {}]
467
468[[action]]
469name = "three"
470command = "c"
471products = ["three"]
472previous_actions = ["two"]
473"#,
474 n - 2
475 );
476
477 temp.child("workflow.toml").write_str(&workflow).unwrap();
478
479 Project::open(2, &None, &mut multi_progress).unwrap()
480 }
481
482 #[test]
483 #[serial]
484 fn matching() {
485 let project = setup(8);
486
487 let mut all_directories = project.state().list_directories();
488 all_directories.sort_unstable();
489
490 let action = &project.workflow.action[0];
491 assert_eq!(
492 project
493 .find_matching_directories(action, all_directories.clone())
494 .unwrap(),
495 all_directories[0..8]
496 );
497
498 let action = &project.workflow.action[1];
499 assert_eq!(
500 project
501 .find_matching_directories(action, all_directories.clone())
502 .unwrap(),
503 all_directories[0..6]
504 );
505
506 let mut action = project.workflow.action[1].clone();
508 let include = action.group.include.as_mut().unwrap();
509 include.clear();
510 include.push(Selector::All(vec![
511 ("/i".into(), Comparison::GreaterThan, Value::from(4)),
512 ("/i".into(), Comparison::LessThan, Value::from(6)),
513 ]));
514 assert_eq!(
515 project
516 .find_matching_directories(&action, all_directories.clone())
517 .unwrap(),
518 vec![PathBuf::from("dir5")]
519 );
520
521 let mut action = project.workflow.action[1].clone();
523 let include = action.group.include.as_mut().unwrap();
524 include.clear();
525 include.push(Selector::Condition((
526 "/i".into(),
527 Comparison::LessThan,
528 Value::from(1),
529 )));
530
531 include.push(Selector::Condition((
532 "/i".into(),
533 Comparison::GreaterThan,
534 Value::from(6),
535 )));
536
537 assert_eq!(
538 project
539 .find_matching_directories(&action, all_directories.clone())
540 .unwrap(),
541 vec![PathBuf::from("dir0"), PathBuf::from("dir7")]
542 );
543 }
544
545 #[test]
546 #[serial]
547 fn status() {
548 let project = setup(8);
549
550 let mut all_directories = project.state().list_directories();
551 all_directories.sort_unstable();
552
553 let action = &project.workflow.action[0];
554 let status = project
555 .separate_by_status(action, all_directories.clone())
556 .unwrap();
557 assert_eq!(status.completed, all_directories);
558 assert!(status.submitted.is_empty());
559 assert!(status.eligible.is_empty());
560 assert!(status.waiting.is_empty());
561
562 let action = &project.workflow.action[1];
563 let status = project
564 .separate_by_status(action, all_directories.clone())
565 .unwrap();
566 assert_eq!(status.completed, all_directories[0..4]);
567 assert!(status.submitted.is_empty());
568 assert_eq!(status.eligible, all_directories[4..8]);
569 assert!(status.waiting.is_empty());
570
571 let action = &project.workflow.action[2];
572 let status = project
573 .separate_by_status(action, all_directories.clone())
574 .unwrap();
575 assert!(status.completed.is_empty());
576 assert!(status.submitted.is_empty());
577 assert_eq!(status.eligible, all_directories[0..4]);
578 assert_eq!(status.waiting, all_directories[4..8]);
579 }
580
581 #[test]
582 #[serial]
583 fn group() {
584 let project = setup(8);
585
586 let mut all_directories = project.state().list_directories();
587 all_directories.sort_unstable();
588
589 let action = &project.workflow.action[0];
590 let groups = project
591 .separate_into_groups(action, all_directories.clone())
592 .unwrap();
593 assert_eq!(groups, vec![all_directories]);
594 }
595
596 #[test]
597 #[serial]
598 fn group_reverse() {
599 let project = setup(8);
600
601 let mut all_directories = project.state().list_directories();
602 all_directories.sort_unstable();
603 let mut reversed = all_directories.clone();
604 reversed.reverse();
605
606 let mut action = project.workflow.action[0].clone();
607 action.group.reverse_sort = Some(true);
608 let groups = project
609 .separate_into_groups(&action, all_directories.clone())
610 .unwrap();
611 assert_eq!(groups, vec![reversed]);
612 }
613
614 #[test]
615 #[serial]
616 fn group_max_size() {
617 let project = setup(8);
618
619 let mut all_directories = project.state().list_directories();
620 all_directories.sort_unstable();
621
622 let mut action = project.workflow.action[0].clone();
623 action.group.maximum_size = Some(3);
624 let groups = project
625 .separate_into_groups(&action, all_directories.clone())
626 .unwrap();
627 assert_eq!(
628 groups,
629 vec![
630 all_directories[0..3].to_vec(),
631 all_directories[3..6].to_vec(),
632 all_directories[6..8].to_vec()
633 ]
634 );
635 }
636
637 #[test]
638 #[serial]
639 fn group_sort() {
640 let project = setup(8);
641
642 let mut all_directories = project.state().list_directories();
643 all_directories.sort_unstable();
644
645 let mut action = project.workflow.action[0].clone();
646 action.group.sort_by = Some(vec!["/j".to_string()]);
647 let groups = project
648 .separate_into_groups(&action, all_directories.clone())
649 .unwrap();
650 assert_eq!(
651 groups,
652 vec![vec![
653 PathBuf::from("dir6"),
654 PathBuf::from("dir7"),
655 PathBuf::from("dir4"),
656 PathBuf::from("dir5"),
657 PathBuf::from("dir2"),
658 PathBuf::from("dir3"),
659 PathBuf::from("dir0"),
660 PathBuf::from("dir1")
661 ]]
662 );
663 }
664
665 #[test]
666 #[serial]
667 fn group_sort_and_split() {
668 let project = setup(8);
669
670 let mut all_directories = project.state().list_directories();
671 all_directories.sort_unstable();
672
673 let mut action = project.workflow.action[0].clone();
674 action.group.sort_by = Some(vec!["/j".to_string()]);
675 action.group.split_by_sort_key = Some(true);
676 let groups = project
677 .separate_into_groups(&action, all_directories.clone())
678 .unwrap();
679 assert_eq!(
680 groups,
681 vec![
682 vec![PathBuf::from("dir6"), PathBuf::from("dir7")],
683 vec![PathBuf::from("dir4"), PathBuf::from("dir5")],
684 vec![PathBuf::from("dir2"), PathBuf::from("dir3")],
685 vec![PathBuf::from("dir0"), PathBuf::from("dir1")]
686 ]
687 );
688 }
689}