row/
project.rs

1// Copyright (c) 2024 The Regents of the University of Michigan.
2// Part of row, released under the BSD 3-Clause License.
3
4use indicatif::ProgressBar;
5use log::{debug, trace, warn};
6use serde_json::Value;
7use std::cmp::Ordering;
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::time::Duration;
11
12use crate::cluster::{self, SchedulerType};
13use crate::expr;
14use crate::launcher;
15use crate::progress_styles;
16use crate::scheduler::bash::Bash;
17use crate::scheduler::slurm::Slurm;
18use crate::scheduler::Scheduler;
19use crate::state::State;
20use crate::workflow::{Action, Selector, Workflow};
21use crate::{Error, MultiProgressContainer};
22
23/// Encapsulate the workflow, state, and scheduler into a project.
24///
25/// When opened, `Project`:
26///
27/// * Reads caches from disk.
28/// * Gets the status of submitted jobs from the scheduler.
29/// * Collects the staged completions.
30/// * Reads the workflow file
31/// * Synchronizes the system state with the workspace on disk.
32/// * And removes any completed jobs from the submitted cache.
33///
34/// These are common operations used by many CLI commands. A command that needs
35/// only a subset of these should use the individual classes directly.
36///
37pub struct Project {
38    /// The project's workflow definition.
39    workflow: Workflow,
40
41    /// The state associate with the directories in the project.
42    state: State,
43
44    /// The scheduler.
45    scheduler: Box<dyn Scheduler>,
46
47    /// The cluster's name.
48    cluster_name: String,
49}
50
51/// Store individual sets of jobs, separated by status for a given action.
52///
53/// Call `Project::separate_by_status` to produce a `Status`.
54///
55#[derive(Debug)]
56pub struct Status {
57    /// Directories that have completed.
58    pub completed: Vec<PathBuf>,
59
60    /// Directories that have been submitted to the scheduler.
61    pub submitted: Vec<PathBuf>,
62
63    /// Directories that are eligible to execute.
64    pub eligible: Vec<PathBuf>,
65
66    /// Directories that are waiting on previous actions to complete.
67    pub waiting: Vec<PathBuf>,
68}
69
70impl Project {
71    /// Open a project from the current working directory or any parents.
72    ///
73    /// # Errors
74    /// Returns `Err<row::Error>` when the project cannot be opened.
75    ///
76    pub fn open(
77        io_threads: u16,
78        cluster_name: &Option<String>,
79        multi_progress: &mut MultiProgressContainer,
80    ) -> Result<Project, Error> {
81        trace!("Opening project.");
82        let workflow = Workflow::open()?;
83        let clusters = cluster::Configuration::open()?;
84        let cluster = clusters.identify(cluster_name.as_deref())?;
85        let launchers = launcher::Configuration::open()?.by_cluster(&cluster.name);
86        let cluster_name = cluster.name.clone();
87
88        let scheduler: Box<dyn Scheduler> = match cluster.scheduler {
89            SchedulerType::Bash => Box::new(Bash::new(cluster, launchers)),
90            SchedulerType::Slurm => Box::new(Slurm::new(cluster, launchers)),
91        };
92
93        let mut state = State::from_cache(&workflow)?;
94
95        // squeue will likely take the longest to finish, start it first.
96        let jobs = state.jobs_submitted_on(&cluster_name);
97        let mut progress =
98            ProgressBar::new_spinner().with_message("Checking submitted job statuses");
99        progress = multi_progress.add_or_hide(progress, jobs.is_empty());
100
101        progress.enable_steady_tick(Duration::from_millis(progress_styles::STEADY_TICK));
102        progress.set_style(progress_styles::uncounted_spinner());
103        progress.tick();
104
105        let active_jobs = scheduler.active_jobs(&jobs)?;
106
107        // Then synchronize with the workspace while squeue is running.
108        state.synchronize_workspace(&workflow, io_threads, multi_progress)?;
109
110        // Now, wait for squeue to finish and remove any inactive jobs.
111        let active_jobs = active_jobs.get()?;
112        progress.finish();
113
114        if active_jobs.len() != jobs.len() {
115            state.remove_inactive_submitted(&cluster_name, &active_jobs);
116        } else if !jobs.is_empty() {
117            trace!("All submitted jobs remain active on {cluster_name}.");
118        }
119
120        Ok(Self {
121            workflow,
122            state,
123            scheduler,
124            cluster_name,
125        })
126    }
127
128    /// Close the project.
129    ///
130    /// Closing saves the updated cache to disk and removes any temporary
131    /// completion pack files. `Project` does not automatically close when
132    /// dropped as the caller may not wish to save the cache when there is
133    /// an error after opening the project.
134    ///
135    /// # Errors
136    /// Returns `Err<row::Error>` when there is an error taking these steps.
137    ///
138    pub fn close(&mut self, multi_progress: &mut MultiProgressContainer) -> Result<(), Error> {
139        debug!("Closing project.");
140
141        self.state.save_cache(&self.workflow, multi_progress)?;
142
143        Ok(())
144    }
145
146    /// Get the project's workflow definition.
147    pub fn workflow(&self) -> &Workflow {
148        &self.workflow
149    }
150
151    /// Get the state of the project's workspace.
152    pub fn state(&self) -> &State {
153        &self.state
154    }
155
156    /// Get the currently active cluster name.
157    pub fn cluster_name(&self) -> &String {
158        &self.cluster_name
159    }
160
161    /// Find the directories that are included by the action.
162    ///
163    /// # Parameters:
164    /// - `action`: The action to match.
165    /// - `directories`: Directories to match.
166    ///
167    /// # Returns
168    /// `Ok(Vec<PathBuf>)` listing directories from `directories` that match
169    /// the action's **include** directive.
170    ///
171    /// # Errors
172    /// `Err(row::Error)` when any action's include pointer cannot be resolved.
173    ///
174    /// # Warnings
175    /// Logs with `warn!` when `subset` contains directories that are not
176    /// present in the workspace.
177    ///
178    pub fn find_matching_directories(
179        &self,
180        action: &Action,
181        directories: Vec<PathBuf>,
182    ) -> Result<Vec<PathBuf>, Error> {
183        trace!(
184            "Finding directories that action '{}' includes.",
185            action.name()
186        );
187
188        let mut matching_directories = Vec::with_capacity(directories.len());
189
190        'outer: for name in directories {
191            if let Some(value) = self.state.values().get(&name) {
192                if action.group.include().is_empty() {
193                    matching_directories.push(name);
194                } else {
195                    for selector in action.group.include() {
196                        let result = match selector {
197                            Selector::Condition((include, comparison, expected)) => {
198                                let actual = value.pointer(include).ok_or_else(|| {
199                                    Error::JSONPointerNotFound(name.clone(), include.clone())
200                                })?;
201
202                                expr::evaluate_json_comparison(comparison, actual, expected)
203                                    .ok_or_else(|| {
204                                        Error::CannotCompareInclude(
205                                            actual.clone(),
206                                            expected.clone(),
207                                            name.clone(),
208                                        )
209                                    })
210                            }
211
212                            Selector::All(conditions) => {
213                                let mut matches = 0;
214                                for (include, comparison, expected) in conditions {
215                                    let actual = value.pointer(include).ok_or_else(|| {
216                                        Error::JSONPointerNotFound(name.clone(), include.clone())
217                                    })?;
218
219                                    if !expr::evaluate_json_comparison(comparison, actual, expected)
220                                        .ok_or_else(|| {
221                                            Error::CannotCompareInclude(
222                                                actual.clone(),
223                                                expected.clone(),
224                                                name.clone(),
225                                            )
226                                        })?
227                                    {
228                                        break;
229                                    }
230                                    matches += 1;
231                                }
232                                Ok(matches == conditions.len())
233                            }
234                        };
235
236                        if result? {
237                            matching_directories.push(name);
238                            continue 'outer;
239                        }
240                    }
241                }
242            } else {
243                warn!("Directory '{}' not found in workspace.", name.display());
244            }
245        }
246
247        trace!("Found {} match(es).", matching_directories.len());
248        Ok(matching_directories)
249    }
250
251    /// Separate a set of directories by their status.
252    ///
253    /// # Parameters:
254    /// - `action`: Report the status for this action.
255    /// - `directories`: Directories to separate.
256    ///
257    /// # Returns
258    /// `Ok(Status)` listing all input `directories` in categories.
259    ///
260    /// # Errors
261    /// `Err(row::Error)` when a given directory is not present.
262    ///
263    pub fn separate_by_status(
264        &self,
265        action: &Action,
266        directories: Vec<PathBuf>,
267    ) -> Result<Status, Error> {
268        trace!(
269            "Separating {} directories by status for '{}'.",
270            directories.len(),
271            action.name()
272        );
273        let capacity = directories.capacity();
274        let mut status = Status {
275            completed: Vec::with_capacity(capacity),
276            submitted: Vec::with_capacity(capacity),
277            eligible: Vec::with_capacity(capacity),
278            waiting: Vec::with_capacity(capacity),
279        };
280
281        for directory_name in directories {
282            if !self.state.values().contains_key(&directory_name) {
283                return Err(Error::DirectoryNotFound(directory_name));
284            }
285
286            let completed = self.state.completed();
287
288            if completed[action.name()].contains(&directory_name) {
289                status.completed.push(directory_name);
290            } else if self.state.is_submitted(action.name(), &directory_name) {
291                status.submitted.push(directory_name);
292            } else if action
293                .previous_actions()
294                .iter()
295                .all(|a| completed[a].contains(&directory_name))
296            {
297                status.eligible.push(directory_name);
298            } else {
299                status.waiting.push(directory_name);
300            }
301        }
302
303        Ok(status)
304    }
305
306    /// Separate directories into groups based on the given parameters
307    ///
308    /// # Errors
309    /// `Err(row::Error)` when a given directory is not present or a JSON
310    /// pointer used for sorting is not present.
311    ///
312    /// # Panics
313    /// When two JSON pointers are not valid for comparison.
314    ///
315    pub fn separate_into_groups(
316        &self,
317        action: &Action,
318        mut directories: Vec<PathBuf>,
319    ) -> Result<Vec<Vec<PathBuf>>, Error> {
320        trace!(
321            "Separating {} directories into groups for '{}'.",
322            directories.len(),
323            action.name()
324        );
325
326        if directories.is_empty() {
327            return Ok(Vec::new());
328        }
329
330        // First, sort the directories by name.
331        directories.sort_unstable();
332
333        // Determine the user-provided sort keys.
334        let mut sort_keys = HashMap::new();
335        for directory_name in &directories {
336            let value = self
337                .state
338                .values()
339                .get(directory_name)
340                .ok_or_else(|| Error::DirectoryNotFound(directory_name.clone()))?;
341
342            let mut sort_key = Vec::new();
343            for pointer in action.group.sort_by() {
344                let element = value.pointer(pointer).ok_or_else(|| {
345                    Error::JSONPointerNotFound(directory_name.clone(), pointer.clone())
346                })?;
347                sort_key.push(element.clone());
348            }
349            sort_keys.insert(directory_name.clone(), Value::Array(sort_key));
350        }
351
352        // Sort by key when there are keys to sort by.
353        let mut result = Vec::new();
354        if action.group.sort_by().is_empty() {
355            if action.group.reverse_sort() {
356                directories.reverse();
357            }
358            result.push(directories);
359        } else {
360            directories.sort_by(|a, b| {
361                expr::partial_cmp_json_values(&sort_keys[a], &sort_keys[b])
362                    .expect("Valid JSON comparison")
363            });
364
365            if action.group.reverse_sort() {
366                directories.reverse();
367            }
368
369            // Split by the sort key when requested.
370            #[allow(clippy::redundant_closure_for_method_calls)]
371            if action.group.split_by_sort_key() {
372                result.extend(
373                    directories
374                        .chunk_by(|a, b| {
375                            expr::partial_cmp_json_values(&sort_keys[a], &sort_keys[b])
376                                .expect("Valid JSON comparison")
377                                == Ordering::Equal
378                        })
379                        .map(|v| v.to_vec()),
380                );
381            } else {
382                result.push(directories);
383            }
384        }
385
386        if let Some(maximum_size) = action.group.maximum_size {
387            let mut new_result = Vec::new();
388            for array in result {
389                #[allow(clippy::redundant_closure_for_method_calls)]
390                new_result.extend(array.chunks(maximum_size).map(|v| v.to_vec()));
391            }
392
393            result = new_result;
394        }
395
396        Ok(result)
397    }
398
399    /// Get the scheduler.
400    pub fn scheduler(&self) -> &dyn Scheduler {
401        self.scheduler.as_ref()
402    }
403
404    /// Add a new submitted job.
405    pub fn add_submitted(&mut self, action_name: &str, directories: &[PathBuf], job_id: u32) {
406        self.state
407            .add_submitted(action_name, directories, &self.cluster_name, job_id);
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use assert_fs::prelude::*;
414    use assert_fs::TempDir;
415    use indicatif::{MultiProgress, ProgressDrawTarget};
416    use serde_json::Value;
417    use serial_test::serial;
418    use std::env;
419
420    use super::*;
421    use crate::workflow::Comparison;
422
423    fn setup(n: usize) -> Project {
424        let _ = env_logger::builder()
425            .filter_level(log::LevelFilter::max())
426            .is_test(true)
427            .try_init();
428
429        let multi_progress = MultiProgress::with_draw_target(ProgressDrawTarget::hidden());
430        let mut multi_progress = MultiProgressContainer {
431            progress_bars: Vec::new(),
432            multi_progress,
433        };
434
435        let temp = TempDir::new().unwrap();
436        env::set_current_dir(temp.path()).unwrap();
437        for i in 0..n {
438            let directory = temp.child("workspace").child(format!("dir{i}"));
439            directory.create_dir_all().unwrap();
440            directory
441                .child("v")
442                .write_str(&format!(r#"{{"i": {}, "j": {}}}"#, i, (n - 1 - i) / 2))
443                .unwrap();
444
445            if i < n / 2 {
446                directory.child("two").touch().unwrap();
447            }
448            directory.child("one").touch().unwrap();
449        }
450
451        let workflow = format!(
452            r#"
453[workspace]
454value_file = "v"
455
456[[action]]
457name = "one"
458command = "c"
459products = ["one"]
460
461[[action]]
462name = "two"
463command = "c"
464products = ["two"]
465[[action.group.include]]
466condition = ["/i", "<", {}]
467
468[[action]]
469name = "three"
470command = "c"
471products = ["three"]
472previous_actions = ["two"]
473"#,
474            n - 2
475        );
476
477        temp.child("workflow.toml").write_str(&workflow).unwrap();
478
479        Project::open(2, &None, &mut multi_progress).unwrap()
480    }
481
482    #[test]
483    #[serial]
484    fn matching() {
485        let project = setup(8);
486
487        let mut all_directories = project.state().list_directories();
488        all_directories.sort_unstable();
489
490        let action = &project.workflow.action[0];
491        assert_eq!(
492            project
493                .find_matching_directories(action, all_directories.clone())
494                .unwrap(),
495            all_directories[0..8]
496        );
497
498        let action = &project.workflow.action[1];
499        assert_eq!(
500            project
501                .find_matching_directories(action, all_directories.clone())
502                .unwrap(),
503            all_directories[0..6]
504        );
505
506        // Check all conditions.
507        let mut action = project.workflow.action[1].clone();
508        let include = action.group.include.as_mut().unwrap();
509        include.clear();
510        include.push(Selector::All(vec![
511            ("/i".into(), Comparison::GreaterThan, Value::from(4)),
512            ("/i".into(), Comparison::LessThan, Value::from(6)),
513        ]));
514        assert_eq!(
515            project
516                .find_matching_directories(&action, all_directories.clone())
517                .unwrap(),
518            vec![PathBuf::from("dir5")]
519        );
520
521        // Check any conditions.
522        let mut action = project.workflow.action[1].clone();
523        let include = action.group.include.as_mut().unwrap();
524        include.clear();
525        include.push(Selector::Condition((
526            "/i".into(),
527            Comparison::LessThan,
528            Value::from(1),
529        )));
530
531        include.push(Selector::Condition((
532            "/i".into(),
533            Comparison::GreaterThan,
534            Value::from(6),
535        )));
536
537        assert_eq!(
538            project
539                .find_matching_directories(&action, all_directories.clone())
540                .unwrap(),
541            vec![PathBuf::from("dir0"), PathBuf::from("dir7")]
542        );
543    }
544
545    #[test]
546    #[serial]
547    fn status() {
548        let project = setup(8);
549
550        let mut all_directories = project.state().list_directories();
551        all_directories.sort_unstable();
552
553        let action = &project.workflow.action[0];
554        let status = project
555            .separate_by_status(action, all_directories.clone())
556            .unwrap();
557        assert_eq!(status.completed, all_directories);
558        assert!(status.submitted.is_empty());
559        assert!(status.eligible.is_empty());
560        assert!(status.waiting.is_empty());
561
562        let action = &project.workflow.action[1];
563        let status = project
564            .separate_by_status(action, all_directories.clone())
565            .unwrap();
566        assert_eq!(status.completed, all_directories[0..4]);
567        assert!(status.submitted.is_empty());
568        assert_eq!(status.eligible, all_directories[4..8]);
569        assert!(status.waiting.is_empty());
570
571        let action = &project.workflow.action[2];
572        let status = project
573            .separate_by_status(action, all_directories.clone())
574            .unwrap();
575        assert!(status.completed.is_empty());
576        assert!(status.submitted.is_empty());
577        assert_eq!(status.eligible, all_directories[0..4]);
578        assert_eq!(status.waiting, all_directories[4..8]);
579    }
580
581    #[test]
582    #[serial]
583    fn group() {
584        let project = setup(8);
585
586        let mut all_directories = project.state().list_directories();
587        all_directories.sort_unstable();
588
589        let action = &project.workflow.action[0];
590        let groups = project
591            .separate_into_groups(action, all_directories.clone())
592            .unwrap();
593        assert_eq!(groups, vec![all_directories]);
594    }
595
596    #[test]
597    #[serial]
598    fn group_reverse() {
599        let project = setup(8);
600
601        let mut all_directories = project.state().list_directories();
602        all_directories.sort_unstable();
603        let mut reversed = all_directories.clone();
604        reversed.reverse();
605
606        let mut action = project.workflow.action[0].clone();
607        action.group.reverse_sort = Some(true);
608        let groups = project
609            .separate_into_groups(&action, all_directories.clone())
610            .unwrap();
611        assert_eq!(groups, vec![reversed]);
612    }
613
614    #[test]
615    #[serial]
616    fn group_max_size() {
617        let project = setup(8);
618
619        let mut all_directories = project.state().list_directories();
620        all_directories.sort_unstable();
621
622        let mut action = project.workflow.action[0].clone();
623        action.group.maximum_size = Some(3);
624        let groups = project
625            .separate_into_groups(&action, all_directories.clone())
626            .unwrap();
627        assert_eq!(
628            groups,
629            vec![
630                all_directories[0..3].to_vec(),
631                all_directories[3..6].to_vec(),
632                all_directories[6..8].to_vec()
633            ]
634        );
635    }
636
637    #[test]
638    #[serial]
639    fn group_sort() {
640        let project = setup(8);
641
642        let mut all_directories = project.state().list_directories();
643        all_directories.sort_unstable();
644
645        let mut action = project.workflow.action[0].clone();
646        action.group.sort_by = Some(vec!["/j".to_string()]);
647        let groups = project
648            .separate_into_groups(&action, all_directories.clone())
649            .unwrap();
650        assert_eq!(
651            groups,
652            vec![vec![
653                PathBuf::from("dir6"),
654                PathBuf::from("dir7"),
655                PathBuf::from("dir4"),
656                PathBuf::from("dir5"),
657                PathBuf::from("dir2"),
658                PathBuf::from("dir3"),
659                PathBuf::from("dir0"),
660                PathBuf::from("dir1")
661            ]]
662        );
663    }
664
665    #[test]
666    #[serial]
667    fn group_sort_and_split() {
668        let project = setup(8);
669
670        let mut all_directories = project.state().list_directories();
671        all_directories.sort_unstable();
672
673        let mut action = project.workflow.action[0].clone();
674        action.group.sort_by = Some(vec!["/j".to_string()]);
675        action.group.split_by_sort_key = Some(true);
676        let groups = project
677            .separate_into_groups(&action, all_directories.clone())
678            .unwrap();
679        assert_eq!(
680            groups,
681            vec![
682                vec![PathBuf::from("dir6"), PathBuf::from("dir7")],
683                vec![PathBuf::from("dir4"), PathBuf::from("dir5")],
684                vec![PathBuf::from("dir2"), PathBuf::from("dir3")],
685                vec![PathBuf::from("dir0"), PathBuf::from("dir1")]
686            ]
687        );
688    }
689}