Skip to main content

omena_incremental/
lib.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use serde::Serialize;
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
6#[serde(rename_all = "camelCase")]
7pub struct OmenaIncrementalBoundarySummaryV0 {
8    pub schema_version: &'static str,
9    pub product: &'static str,
10    pub engine_name: &'static str,
11    pub invalidation_model: &'static str,
12    pub node_identity: Vec<&'static str>,
13    pub dirty_reasons: Vec<&'static str>,
14    pub ready_surfaces: Vec<&'static str>,
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
18#[serde(rename_all = "camelCase")]
19pub struct IncrementalRevisionV0 {
20    pub value: u64,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct IncrementalGraphInputV0 {
25    pub revision: IncrementalRevisionV0,
26    pub nodes: Vec<IncrementalNodeInputV0>,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct IncrementalNodeInputV0 {
31    pub id: String,
32    pub digest: String,
33    pub dependency_ids: Vec<String>,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
37#[serde(rename_all = "camelCase")]
38pub struct IncrementalSnapshotV0 {
39    pub schema_version: &'static str,
40    pub product: &'static str,
41    pub revision: IncrementalRevisionV0,
42    pub nodes: Vec<IncrementalSnapshotNodeV0>,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
46#[serde(rename_all = "camelCase")]
47pub struct IncrementalSnapshotNodeV0 {
48    pub id: String,
49    pub digest: String,
50    pub dependency_ids: Vec<String>,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
54#[serde(rename_all = "camelCase")]
55pub struct IncrementalComputationPlanV0 {
56    pub schema_version: &'static str,
57    pub product: &'static str,
58    pub revision: IncrementalRevisionV0,
59    pub node_count: usize,
60    pub dirty_node_count: usize,
61    pub changed_input_count: usize,
62    pub new_node_count: usize,
63    pub removed_node_count: usize,
64    pub dependency_dirty_count: usize,
65    pub nodes: Vec<IncrementalComputationNodeV0>,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
69#[serde(rename_all = "camelCase")]
70pub struct IncrementalComputationNodeV0 {
71    pub id: String,
72    pub digest: String,
73    pub dependency_ids: Vec<String>,
74    pub dirty: bool,
75    pub reasons: Vec<&'static str>,
76}
77
78pub fn summarize_omena_incremental_boundary() -> OmenaIncrementalBoundarySummaryV0 {
79    OmenaIncrementalBoundarySummaryV0 {
80        schema_version: "0",
81        product: "omena-incremental.boundary",
82        engine_name: "omena-incremental",
83        invalidation_model: "stableNodeId+inputDigest+dependencyPropagation",
84        node_identity: vec!["id", "digest", "dependencyIds"],
85        dirty_reasons: vec![
86            "newNode",
87            "inputDigestChanged",
88            "dependencySetChanged",
89            "dependencyDirty",
90        ],
91        ready_surfaces: vec![
92            "incrementalGraphInput",
93            "incrementalSnapshot",
94            "incrementalComputationPlan",
95        ],
96    }
97}
98
99pub fn snapshot_from_graph_input(input: &IncrementalGraphInputV0) -> IncrementalSnapshotV0 {
100    IncrementalSnapshotV0 {
101        schema_version: "0",
102        product: "omena-incremental.snapshot",
103        revision: input.revision,
104        nodes: normalized_snapshot_nodes(input),
105    }
106}
107
108pub fn plan_incremental_computation(
109    input: &IncrementalGraphInputV0,
110    previous: Option<&IncrementalSnapshotV0>,
111) -> IncrementalComputationPlanV0 {
112    let normalized_nodes = normalized_snapshot_nodes(input);
113    let previous_by_id = previous
114        .map(|snapshot| {
115            snapshot
116                .nodes
117                .iter()
118                .map(|node| (node.id.as_str(), node))
119                .collect::<BTreeMap<_, _>>()
120        })
121        .unwrap_or_default();
122    let current_ids = normalized_nodes
123        .iter()
124        .map(|node| node.id.as_str())
125        .collect::<BTreeSet<_>>();
126    let removed_node_count = previous_by_id
127        .keys()
128        .filter(|id| !current_ids.contains(**id))
129        .count();
130    let mut dirty_ids = BTreeSet::<String>::new();
131    let mut nodes = normalized_nodes
132        .into_iter()
133        .map(|node| {
134            let mut reasons = Vec::new();
135            match previous_by_id.get(node.id.as_str()) {
136                None => reasons.push("newNode"),
137                Some(previous_node) => {
138                    if previous_node.digest != node.digest {
139                        reasons.push("inputDigestChanged");
140                    }
141                    if previous_node.dependency_ids != node.dependency_ids {
142                        reasons.push("dependencySetChanged");
143                    }
144                }
145            }
146            if !reasons.is_empty() {
147                dirty_ids.insert(node.id.clone());
148            }
149
150            IncrementalComputationNodeV0 {
151                id: node.id,
152                digest: node.digest,
153                dependency_ids: node.dependency_ids,
154                dirty: !reasons.is_empty(),
155                reasons,
156            }
157        })
158        .collect::<Vec<_>>();
159
160    propagate_dependency_dirty(&mut nodes, &mut dirty_ids);
161
162    IncrementalComputationPlanV0 {
163        schema_version: "0",
164        product: "omena-incremental.computation-plan",
165        revision: input.revision,
166        node_count: nodes.len(),
167        dirty_node_count: nodes.iter().filter(|node| node.dirty).count(),
168        changed_input_count: nodes
169            .iter()
170            .filter(|node| node.reasons.contains(&"inputDigestChanged"))
171            .count(),
172        new_node_count: nodes
173            .iter()
174            .filter(|node| node.reasons.contains(&"newNode"))
175            .count(),
176        removed_node_count,
177        dependency_dirty_count: nodes
178            .iter()
179            .filter(|node| node.reasons.contains(&"dependencyDirty"))
180            .count(),
181        nodes,
182    }
183}
184
185fn normalized_snapshot_nodes(input: &IncrementalGraphInputV0) -> Vec<IncrementalSnapshotNodeV0> {
186    let mut nodes = input
187        .nodes
188        .iter()
189        .map(|node| IncrementalSnapshotNodeV0 {
190            id: node.id.clone(),
191            digest: node.digest.clone(),
192            dependency_ids: normalized_ids(&node.dependency_ids),
193        })
194        .collect::<Vec<_>>();
195    nodes.sort_by(|left, right| left.id.cmp(&right.id));
196    nodes
197}
198
199fn normalized_ids(ids: &[String]) -> Vec<String> {
200    ids.iter()
201        .cloned()
202        .collect::<BTreeSet<_>>()
203        .into_iter()
204        .collect()
205}
206
207fn propagate_dependency_dirty(
208    nodes: &mut [IncrementalComputationNodeV0],
209    dirty_ids: &mut BTreeSet<String>,
210) {
211    loop {
212        let mut changed = false;
213        for node in nodes.iter_mut() {
214            if node.dirty {
215                continue;
216            }
217            if node
218                .dependency_ids
219                .iter()
220                .any(|dependency_id| dirty_ids.contains(dependency_id))
221            {
222                node.dirty = true;
223                node.reasons.push("dependencyDirty");
224                dirty_ids.insert(node.id.clone());
225                changed = true;
226            }
227        }
228
229        if !changed {
230            break;
231        }
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::{
238        IncrementalGraphInputV0, IncrementalNodeInputV0, IncrementalRevisionV0,
239        plan_incremental_computation, snapshot_from_graph_input,
240        summarize_omena_incremental_boundary,
241    };
242
243    #[test]
244    fn summarizes_incremental_boundary() {
245        let summary = summarize_omena_incremental_boundary();
246
247        assert_eq!(summary.product, "omena-incremental.boundary");
248        assert!(summary.dirty_reasons.contains(&"dependencyDirty"));
249    }
250
251    #[test]
252    fn first_plan_marks_all_nodes_dirty() {
253        let input = sample_input("a:v1", "b:v1", 1);
254        let plan = plan_incremental_computation(&input, None);
255
256        assert_eq!(plan.product, "omena-incremental.computation-plan");
257        assert_eq!(plan.node_count, 2);
258        assert_eq!(plan.dirty_node_count, 2);
259        assert_eq!(plan.new_node_count, 2);
260    }
261
262    #[test]
263    fn unchanged_second_plan_marks_nodes_clean() {
264        let input = sample_input("a:v1", "b:v1", 1);
265        let snapshot = snapshot_from_graph_input(&input);
266        let next_input = sample_input("a:v1", "b:v1", 2);
267        let plan = plan_incremental_computation(&next_input, Some(&snapshot));
268
269        assert_eq!(plan.dirty_node_count, 0);
270        assert_eq!(plan.changed_input_count, 0);
271    }
272
273    #[test]
274    fn changed_dependency_marks_dependent_dirty() {
275        let input = sample_input("a:v1", "b:v1", 1);
276        let snapshot = snapshot_from_graph_input(&input);
277        let next_input = sample_input("a:v2", "b:v1", 2);
278        let plan = plan_incremental_computation(&next_input, Some(&snapshot));
279
280        assert_eq!(plan.changed_input_count, 1);
281        assert_eq!(plan.dependency_dirty_count, 1);
282        assert_eq!(node_reasons(&plan, "a"), vec!["inputDigestChanged"]);
283        assert_eq!(node_reasons(&plan, "b"), vec!["dependencyDirty"]);
284    }
285
286    fn sample_input(a_digest: &str, b_digest: &str, revision: u64) -> IncrementalGraphInputV0 {
287        IncrementalGraphInputV0 {
288            revision: IncrementalRevisionV0 { value: revision },
289            nodes: vec![
290                IncrementalNodeInputV0 {
291                    id: "b".to_string(),
292                    digest: b_digest.to_string(),
293                    dependency_ids: vec!["a".to_string()],
294                },
295                IncrementalNodeInputV0 {
296                    id: "a".to_string(),
297                    digest: a_digest.to_string(),
298                    dependency_ids: Vec::new(),
299                },
300            ],
301        }
302    }
303
304    fn node_reasons(plan: &super::IncrementalComputationPlanV0, id: &str) -> Vec<&'static str> {
305        plan.nodes
306            .iter()
307            .find(|node| node.id == id)
308            .map(|node| node.reasons.clone())
309            .unwrap_or_default()
310    }
311}