Skip to main content

opal/execution_plan/
types.rs

1use crate::compiler::{JobInstance, JobVariantInfo};
2use crate::model::JobDependencySpec;
3use anyhow::{Result, anyhow};
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::path::PathBuf;
6
7#[derive(Debug, Clone)]
8pub struct ExecutableJob {
9    pub instance: JobInstance,
10    pub log_path: PathBuf,
11    pub log_hash: String,
12}
13
14#[derive(Debug)]
15pub struct ExecutionPlan {
16    pub ordered: Vec<String>,
17    pub nodes: HashMap<String, ExecutableJob>,
18    pub dependents: HashMap<String, Vec<String>>,
19    pub order_index: HashMap<String, usize>,
20    pub variants: HashMap<String, Vec<JobVariantInfo>>,
21}
22
23impl ExecutionPlan {
24    pub fn variants_for_dependency(&self, dep: &JobDependencySpec) -> Vec<String> {
25        let Some(entries) = self.variants.get(&dep.job) else {
26            return Vec::new();
27        };
28        select_variants(entries, dep)
29            .into_iter()
30            .map(|variant| variant.name.clone())
31            .collect()
32    }
33
34    pub fn select_jobs(&self, selectors: &[String]) -> Result<Self> {
35        if selectors.is_empty() {
36            return Ok(self.clone());
37        }
38
39        let mut requested = HashSet::new();
40        for selector in selectors {
41            let matches = self.resolve_selector(selector);
42            if matches.is_empty() {
43                return Err(anyhow!(
44                    "selected job '{}' was not found in execution plan",
45                    selector
46                ));
47            }
48            requested.extend(matches);
49        }
50
51        let mut keep = requested.clone();
52        let mut queue: VecDeque<String> = requested.into_iter().collect();
53        while let Some(name) = queue.pop_front() {
54            let Some(planned) = self.nodes.get(&name) else {
55                continue;
56            };
57            for dep in &planned.instance.dependencies {
58                if keep.insert(dep.clone()) {
59                    queue.push_back(dep.clone());
60                }
61            }
62        }
63
64        let ordered = self
65            .ordered
66            .iter()
67            .filter(|name| keep.contains(*name))
68            .cloned()
69            .collect::<Vec<_>>();
70        let nodes = self
71            .nodes
72            .iter()
73            .filter(|(name, _)| keep.contains(*name))
74            .map(|(name, planned)| (name.clone(), planned.clone()))
75            .collect::<HashMap<_, _>>();
76        let dependents = self
77            .dependents
78            .iter()
79            .filter_map(|(name, downstream)| {
80                if !keep.contains(name) {
81                    return None;
82                }
83                let filtered = downstream
84                    .iter()
85                    .filter(|child| keep.contains(*child))
86                    .cloned()
87                    .collect::<Vec<_>>();
88                Some((name.clone(), filtered))
89            })
90            .collect::<HashMap<_, _>>();
91        let order_index = ordered
92            .iter()
93            .enumerate()
94            .map(|(idx, name)| (name.clone(), idx))
95            .collect::<HashMap<_, _>>();
96        let variants = self
97            .variants
98            .iter()
99            .filter_map(|(base, entries)| {
100                let filtered = entries
101                    .iter()
102                    .filter(|entry| keep.contains(&entry.name))
103                    .cloned()
104                    .collect::<Vec<_>>();
105                if filtered.is_empty() {
106                    None
107                } else {
108                    Some((base.clone(), filtered))
109                }
110            })
111            .collect::<HashMap<_, _>>();
112
113        Ok(Self {
114            ordered,
115            nodes,
116            dependents,
117            order_index,
118            variants,
119        })
120    }
121
122    fn resolve_selector(&self, selector: &str) -> HashSet<String> {
123        let mut matches = HashSet::new();
124        if self.nodes.contains_key(selector) {
125            matches.insert(selector.to_string());
126        }
127        if let Some(entries) = self.variants.get(selector) {
128            matches.extend(entries.iter().map(|entry| entry.name.clone()));
129        }
130        matches
131    }
132}
133
134fn select_variants<'a>(
135    variants: &'a [JobVariantInfo],
136    dep: &JobDependencySpec,
137) -> Vec<&'a JobVariantInfo> {
138    // TODO: this is the nth function like this, stop it, please
139    if let Some(filters) = &dep.parallel {
140        variants
141            .iter()
142            .filter(|variant| {
143                filters.iter().any(|filter| {
144                    filter.iter().all(|(key, value)| {
145                        variant
146                            .labels
147                            .get(key)
148                            .map(|current| current == value)
149                            .unwrap_or(false)
150                    })
151                })
152            })
153            .collect()
154    } else if let Some(expected) = &dep.inline_variant {
155        variants
156            .iter()
157            .filter(|variant| &variant.ordered_values == expected)
158            .collect()
159    } else {
160        variants.iter().collect()
161    }
162}
163
164impl Clone for ExecutionPlan {
165    fn clone(&self) -> Self {
166        Self {
167            ordered: self.ordered.clone(),
168            nodes: self.nodes.clone(),
169            dependents: self.dependents.clone(),
170            order_index: self.order_index.clone(),
171            variants: self.variants.clone(),
172        }
173    }
174}