1use std::collections::{BTreeMap, BTreeSet};
2
3use serde::Serialize;
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
6#[serde(rename_all = "camelCase")]
7pub struct OmenaIncrementalBoundarySummaryV0 {
8 pub schema_version: &'static str,
9 pub product: &'static str,
10 pub engine_name: &'static str,
11 pub invalidation_model: &'static str,
12 pub node_identity: Vec<&'static str>,
13 pub dirty_reasons: Vec<&'static str>,
14 pub ready_surfaces: Vec<&'static str>,
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
18#[serde(rename_all = "camelCase")]
19pub struct IncrementalRevisionV0 {
20 pub value: u64,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct IncrementalGraphInputV0 {
25 pub revision: IncrementalRevisionV0,
26 pub nodes: Vec<IncrementalNodeInputV0>,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct IncrementalNodeInputV0 {
31 pub id: String,
32 pub digest: String,
33 pub dependency_ids: Vec<String>,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
37#[serde(rename_all = "camelCase")]
38pub struct IncrementalSnapshotV0 {
39 pub schema_version: &'static str,
40 pub product: &'static str,
41 pub revision: IncrementalRevisionV0,
42 pub nodes: Vec<IncrementalSnapshotNodeV0>,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
46#[serde(rename_all = "camelCase")]
47pub struct IncrementalSnapshotNodeV0 {
48 pub id: String,
49 pub digest: String,
50 pub dependency_ids: Vec<String>,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
54#[serde(rename_all = "camelCase")]
55pub struct IncrementalComputationPlanV0 {
56 pub schema_version: &'static str,
57 pub product: &'static str,
58 pub revision: IncrementalRevisionV0,
59 pub node_count: usize,
60 pub dirty_node_count: usize,
61 pub changed_input_count: usize,
62 pub new_node_count: usize,
63 pub removed_node_count: usize,
64 pub dependency_dirty_count: usize,
65 pub nodes: Vec<IncrementalComputationNodeV0>,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
69#[serde(rename_all = "camelCase")]
70pub struct IncrementalComputationNodeV0 {
71 pub id: String,
72 pub digest: String,
73 pub dependency_ids: Vec<String>,
74 pub dirty: bool,
75 pub reasons: Vec<&'static str>,
76}
77
78pub fn summarize_omena_incremental_boundary() -> OmenaIncrementalBoundarySummaryV0 {
79 OmenaIncrementalBoundarySummaryV0 {
80 schema_version: "0",
81 product: "omena-incremental.boundary",
82 engine_name: "omena-incremental",
83 invalidation_model: "stableNodeId+inputDigest+dependencyPropagation",
84 node_identity: vec!["id", "digest", "dependencyIds"],
85 dirty_reasons: vec![
86 "newNode",
87 "inputDigestChanged",
88 "dependencySetChanged",
89 "dependencyDirty",
90 ],
91 ready_surfaces: vec![
92 "incrementalGraphInput",
93 "incrementalSnapshot",
94 "incrementalComputationPlan",
95 ],
96 }
97}
98
99pub fn snapshot_from_graph_input(input: &IncrementalGraphInputV0) -> IncrementalSnapshotV0 {
100 IncrementalSnapshotV0 {
101 schema_version: "0",
102 product: "omena-incremental.snapshot",
103 revision: input.revision,
104 nodes: normalized_snapshot_nodes(input),
105 }
106}
107
108pub fn plan_incremental_computation(
109 input: &IncrementalGraphInputV0,
110 previous: Option<&IncrementalSnapshotV0>,
111) -> IncrementalComputationPlanV0 {
112 let normalized_nodes = normalized_snapshot_nodes(input);
113 let previous_by_id = previous
114 .map(|snapshot| {
115 snapshot
116 .nodes
117 .iter()
118 .map(|node| (node.id.as_str(), node))
119 .collect::<BTreeMap<_, _>>()
120 })
121 .unwrap_or_default();
122 let current_ids = normalized_nodes
123 .iter()
124 .map(|node| node.id.as_str())
125 .collect::<BTreeSet<_>>();
126 let removed_node_count = previous_by_id
127 .keys()
128 .filter(|id| !current_ids.contains(**id))
129 .count();
130 let mut dirty_ids = BTreeSet::<String>::new();
131 let mut nodes = normalized_nodes
132 .into_iter()
133 .map(|node| {
134 let mut reasons = Vec::new();
135 match previous_by_id.get(node.id.as_str()) {
136 None => reasons.push("newNode"),
137 Some(previous_node) => {
138 if previous_node.digest != node.digest {
139 reasons.push("inputDigestChanged");
140 }
141 if previous_node.dependency_ids != node.dependency_ids {
142 reasons.push("dependencySetChanged");
143 }
144 }
145 }
146 if !reasons.is_empty() {
147 dirty_ids.insert(node.id.clone());
148 }
149
150 IncrementalComputationNodeV0 {
151 id: node.id,
152 digest: node.digest,
153 dependency_ids: node.dependency_ids,
154 dirty: !reasons.is_empty(),
155 reasons,
156 }
157 })
158 .collect::<Vec<_>>();
159
160 propagate_dependency_dirty(&mut nodes, &mut dirty_ids);
161
162 IncrementalComputationPlanV0 {
163 schema_version: "0",
164 product: "omena-incremental.computation-plan",
165 revision: input.revision,
166 node_count: nodes.len(),
167 dirty_node_count: nodes.iter().filter(|node| node.dirty).count(),
168 changed_input_count: nodes
169 .iter()
170 .filter(|node| node.reasons.contains(&"inputDigestChanged"))
171 .count(),
172 new_node_count: nodes
173 .iter()
174 .filter(|node| node.reasons.contains(&"newNode"))
175 .count(),
176 removed_node_count,
177 dependency_dirty_count: nodes
178 .iter()
179 .filter(|node| node.reasons.contains(&"dependencyDirty"))
180 .count(),
181 nodes,
182 }
183}
184
185fn normalized_snapshot_nodes(input: &IncrementalGraphInputV0) -> Vec<IncrementalSnapshotNodeV0> {
186 let mut nodes = input
187 .nodes
188 .iter()
189 .map(|node| IncrementalSnapshotNodeV0 {
190 id: node.id.clone(),
191 digest: node.digest.clone(),
192 dependency_ids: normalized_ids(&node.dependency_ids),
193 })
194 .collect::<Vec<_>>();
195 nodes.sort_by(|left, right| left.id.cmp(&right.id));
196 nodes
197}
198
199fn normalized_ids(ids: &[String]) -> Vec<String> {
200 ids.iter()
201 .cloned()
202 .collect::<BTreeSet<_>>()
203 .into_iter()
204 .collect()
205}
206
207fn propagate_dependency_dirty(
208 nodes: &mut [IncrementalComputationNodeV0],
209 dirty_ids: &mut BTreeSet<String>,
210) {
211 loop {
212 let mut changed = false;
213 for node in nodes.iter_mut() {
214 if node.dirty {
215 continue;
216 }
217 if node
218 .dependency_ids
219 .iter()
220 .any(|dependency_id| dirty_ids.contains(dependency_id))
221 {
222 node.dirty = true;
223 node.reasons.push("dependencyDirty");
224 dirty_ids.insert(node.id.clone());
225 changed = true;
226 }
227 }
228
229 if !changed {
230 break;
231 }
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::{
238 IncrementalGraphInputV0, IncrementalNodeInputV0, IncrementalRevisionV0,
239 plan_incremental_computation, snapshot_from_graph_input,
240 summarize_omena_incremental_boundary,
241 };
242
243 #[test]
244 fn summarizes_incremental_boundary() {
245 let summary = summarize_omena_incremental_boundary();
246
247 assert_eq!(summary.product, "omena-incremental.boundary");
248 assert!(summary.dirty_reasons.contains(&"dependencyDirty"));
249 }
250
251 #[test]
252 fn first_plan_marks_all_nodes_dirty() {
253 let input = sample_input("a:v1", "b:v1", 1);
254 let plan = plan_incremental_computation(&input, None);
255
256 assert_eq!(plan.product, "omena-incremental.computation-plan");
257 assert_eq!(plan.node_count, 2);
258 assert_eq!(plan.dirty_node_count, 2);
259 assert_eq!(plan.new_node_count, 2);
260 }
261
262 #[test]
263 fn unchanged_second_plan_marks_nodes_clean() {
264 let input = sample_input("a:v1", "b:v1", 1);
265 let snapshot = snapshot_from_graph_input(&input);
266 let next_input = sample_input("a:v1", "b:v1", 2);
267 let plan = plan_incremental_computation(&next_input, Some(&snapshot));
268
269 assert_eq!(plan.dirty_node_count, 0);
270 assert_eq!(plan.changed_input_count, 0);
271 }
272
273 #[test]
274 fn changed_dependency_marks_dependent_dirty() {
275 let input = sample_input("a:v1", "b:v1", 1);
276 let snapshot = snapshot_from_graph_input(&input);
277 let next_input = sample_input("a:v2", "b:v1", 2);
278 let plan = plan_incremental_computation(&next_input, Some(&snapshot));
279
280 assert_eq!(plan.changed_input_count, 1);
281 assert_eq!(plan.dependency_dirty_count, 1);
282 assert_eq!(node_reasons(&plan, "a"), vec!["inputDigestChanged"]);
283 assert_eq!(node_reasons(&plan, "b"), vec!["dependencyDirty"]);
284 }
285
286 fn sample_input(a_digest: &str, b_digest: &str, revision: u64) -> IncrementalGraphInputV0 {
287 IncrementalGraphInputV0 {
288 revision: IncrementalRevisionV0 { value: revision },
289 nodes: vec![
290 IncrementalNodeInputV0 {
291 id: "b".to_string(),
292 digest: b_digest.to_string(),
293 dependency_ids: vec!["a".to_string()],
294 },
295 IncrementalNodeInputV0 {
296 id: "a".to_string(),
297 digest: a_digest.to_string(),
298 dependency_ids: Vec::new(),
299 },
300 ],
301 }
302 }
303
304 fn node_reasons(plan: &super::IncrementalComputationPlanV0, id: &str) -> Vec<&'static str> {
305 plan.nodes
306 .iter()
307 .find(|node| node.id == id)
308 .map(|node| node.reasons.clone())
309 .unwrap_or_default()
310 }
311}