opal/execution_plan/
types.rs1use crate::compiler::{JobInstance, JobVariantInfo};
2use crate::model::JobDependencySpec;
3use anyhow::{Result, anyhow};
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::path::PathBuf;
6
7#[derive(Debug, Clone)]
8pub struct ExecutableJob {
9 pub instance: JobInstance,
10 pub log_path: PathBuf,
11 pub log_hash: String,
12}
13
14#[derive(Debug)]
15pub struct ExecutionPlan {
16 pub ordered: Vec<String>,
17 pub nodes: HashMap<String, ExecutableJob>,
18 pub dependents: HashMap<String, Vec<String>>,
19 pub order_index: HashMap<String, usize>,
20 pub variants: HashMap<String, Vec<JobVariantInfo>>,
21}
22
23impl ExecutionPlan {
24 pub fn variants_for_dependency(&self, dep: &JobDependencySpec) -> Vec<String> {
25 let Some(entries) = self.variants.get(&dep.job) else {
26 return Vec::new();
27 };
28 select_variants(entries, dep)
29 .into_iter()
30 .map(|variant| variant.name.clone())
31 .collect()
32 }
33
34 pub fn select_jobs(&self, selectors: &[String]) -> Result<Self> {
35 if selectors.is_empty() {
36 return Ok(self.clone());
37 }
38
39 let mut requested = HashSet::new();
40 for selector in selectors {
41 let matches = self.resolve_selector(selector);
42 if matches.is_empty() {
43 return Err(anyhow!(
44 "selected job '{}' was not found in execution plan",
45 selector
46 ));
47 }
48 requested.extend(matches);
49 }
50
51 let mut keep = requested.clone();
52 let mut queue: VecDeque<String> = requested.into_iter().collect();
53 while let Some(name) = queue.pop_front() {
54 let Some(planned) = self.nodes.get(&name) else {
55 continue;
56 };
57 for dep in &planned.instance.dependencies {
58 if keep.insert(dep.clone()) {
59 queue.push_back(dep.clone());
60 }
61 }
62 }
63
64 let ordered = self
65 .ordered
66 .iter()
67 .filter(|name| keep.contains(*name))
68 .cloned()
69 .collect::<Vec<_>>();
70 let nodes = self
71 .nodes
72 .iter()
73 .filter(|(name, _)| keep.contains(*name))
74 .map(|(name, planned)| (name.clone(), planned.clone()))
75 .collect::<HashMap<_, _>>();
76 let dependents = self
77 .dependents
78 .iter()
79 .filter_map(|(name, downstream)| {
80 if !keep.contains(name) {
81 return None;
82 }
83 let filtered = downstream
84 .iter()
85 .filter(|child| keep.contains(*child))
86 .cloned()
87 .collect::<Vec<_>>();
88 Some((name.clone(), filtered))
89 })
90 .collect::<HashMap<_, _>>();
91 let order_index = ordered
92 .iter()
93 .enumerate()
94 .map(|(idx, name)| (name.clone(), idx))
95 .collect::<HashMap<_, _>>();
96 let variants = self
97 .variants
98 .iter()
99 .filter_map(|(base, entries)| {
100 let filtered = entries
101 .iter()
102 .filter(|entry| keep.contains(&entry.name))
103 .cloned()
104 .collect::<Vec<_>>();
105 if filtered.is_empty() {
106 None
107 } else {
108 Some((base.clone(), filtered))
109 }
110 })
111 .collect::<HashMap<_, _>>();
112
113 Ok(Self {
114 ordered,
115 nodes,
116 dependents,
117 order_index,
118 variants,
119 })
120 }
121
122 fn resolve_selector(&self, selector: &str) -> HashSet<String> {
123 let mut matches = HashSet::new();
124 if self.nodes.contains_key(selector) {
125 matches.insert(selector.to_string());
126 }
127 if let Some(entries) = self.variants.get(selector) {
128 matches.extend(entries.iter().map(|entry| entry.name.clone()));
129 }
130 matches
131 }
132}
133
134fn select_variants<'a>(
135 variants: &'a [JobVariantInfo],
136 dep: &JobDependencySpec,
137) -> Vec<&'a JobVariantInfo> {
138 if let Some(filters) = &dep.parallel {
140 variants
141 .iter()
142 .filter(|variant| {
143 filters.iter().any(|filter| {
144 filter.iter().all(|(key, value)| {
145 variant
146 .labels
147 .get(key)
148 .map(|current| current == value)
149 .unwrap_or(false)
150 })
151 })
152 })
153 .collect()
154 } else if let Some(expected) = &dep.inline_variant {
155 variants
156 .iter()
157 .filter(|variant| &variant.ordered_values == expected)
158 .collect()
159 } else {
160 variants.iter().collect()
161 }
162}
163
164impl Clone for ExecutionPlan {
165 fn clone(&self) -> Self {
166 Self {
167 ordered: self.ordered.clone(),
168 nodes: self.nodes.clone(),
169 dependents: self.dependents.clone(),
170 order_index: self.order_index.clone(),
171 variants: self.variants.clone(),
172 }
173 }
174}