Skip to main content

omena_incremental/
lib.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use serde::Serialize;
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
6#[serde(rename_all = "camelCase")]
7pub struct OmenaIncrementalBoundarySummaryV0 {
8    pub schema_version: &'static str,
9    pub product: &'static str,
10    pub engine_name: &'static str,
11    pub invalidation_model: &'static str,
12    pub node_identity: Vec<&'static str>,
13    pub dirty_reasons: Vec<&'static str>,
14    pub ready_surfaces: Vec<&'static str>,
15}
16
17pub const DEFAULT_INCREMENTAL_CANCELLATION_LIMIT: usize = 128;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
20#[serde(rename_all = "camelCase")]
21pub struct IncrementalRevisionV0 {
22    pub value: u64,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct IncrementalGraphInputV0 {
27    pub revision: IncrementalRevisionV0,
28    pub nodes: Vec<IncrementalNodeInputV0>,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct IncrementalNodeInputV0 {
33    pub id: String,
34    pub digest: String,
35    pub dependency_ids: Vec<String>,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
39#[serde(rename_all = "camelCase")]
40pub struct IncrementalSnapshotV0 {
41    pub schema_version: &'static str,
42    pub product: &'static str,
43    pub revision: IncrementalRevisionV0,
44    pub nodes: Vec<IncrementalSnapshotNodeV0>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
48#[serde(rename_all = "camelCase")]
49pub struct IncrementalSnapshotNodeV0 {
50    pub id: String,
51    pub digest: String,
52    pub dependency_ids: Vec<String>,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
56#[serde(rename_all = "camelCase")]
57pub struct IncrementalComputationPlanV0 {
58    pub schema_version: &'static str,
59    pub product: &'static str,
60    pub revision: IncrementalRevisionV0,
61    pub node_count: usize,
62    pub dirty_node_count: usize,
63    pub changed_input_count: usize,
64    pub new_node_count: usize,
65    pub removed_node_count: usize,
66    pub dependency_dirty_count: usize,
67    pub nodes: Vec<IncrementalComputationNodeV0>,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
71#[serde(rename_all = "camelCase")]
72pub struct IncrementalComputationNodeV0 {
73    pub id: String,
74    pub digest: String,
75    pub dependency_ids: Vec<String>,
76    pub dirty: bool,
77    pub reasons: Vec<&'static str>,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
81#[serde(rename_all = "camelCase")]
82pub struct IncrementalCancellationSnapshotV0 {
83    pub schema_version: &'static str,
84    pub product: &'static str,
85    pub cancelled_request_count: usize,
86    pub cancelled_request_ids: Vec<String>,
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct IncrementalCancellationRegistryV0 {
91    limit: usize,
92    cancelled_request_ids: BTreeSet<String>,
93}
94
95pub fn summarize_omena_incremental_boundary() -> OmenaIncrementalBoundarySummaryV0 {
96    OmenaIncrementalBoundarySummaryV0 {
97        schema_version: "0",
98        product: "omena-incremental.boundary",
99        engine_name: "omena-incremental",
100        invalidation_model: "stableNodeId+inputDigest+dependencyPropagation",
101        node_identity: vec!["id", "digest", "dependencyIds"],
102        dirty_reasons: vec![
103            "newNode",
104            "inputDigestChanged",
105            "dependencySetChanged",
106            "dependencyDirty",
107        ],
108        ready_surfaces: vec![
109            "incrementalGraphInput",
110            "incrementalSnapshot",
111            "incrementalComputationPlan",
112            "incrementalCancellationRegistry",
113        ],
114    }
115}
116
117pub fn snapshot_from_graph_input(input: &IncrementalGraphInputV0) -> IncrementalSnapshotV0 {
118    IncrementalSnapshotV0 {
119        schema_version: "0",
120        product: "omena-incremental.snapshot",
121        revision: input.revision,
122        nodes: normalized_snapshot_nodes(input),
123    }
124}
125
126pub fn plan_incremental_computation(
127    input: &IncrementalGraphInputV0,
128    previous: Option<&IncrementalSnapshotV0>,
129) -> IncrementalComputationPlanV0 {
130    let normalized_nodes = normalized_snapshot_nodes(input);
131    let previous_by_id = previous
132        .map(|snapshot| {
133            snapshot
134                .nodes
135                .iter()
136                .map(|node| (node.id.as_str(), node))
137                .collect::<BTreeMap<_, _>>()
138        })
139        .unwrap_or_default();
140    let current_ids = normalized_nodes
141        .iter()
142        .map(|node| node.id.as_str())
143        .collect::<BTreeSet<_>>();
144    let removed_node_count = previous_by_id
145        .keys()
146        .filter(|id| !current_ids.contains(**id))
147        .count();
148    let mut dirty_ids = BTreeSet::<String>::new();
149    let mut nodes = normalized_nodes
150        .into_iter()
151        .map(|node| {
152            let mut reasons = Vec::new();
153            match previous_by_id.get(node.id.as_str()) {
154                None => reasons.push("newNode"),
155                Some(previous_node) => {
156                    if previous_node.digest != node.digest {
157                        reasons.push("inputDigestChanged");
158                    }
159                    if previous_node.dependency_ids != node.dependency_ids {
160                        reasons.push("dependencySetChanged");
161                    }
162                }
163            }
164            if !reasons.is_empty() {
165                dirty_ids.insert(node.id.clone());
166            }
167
168            IncrementalComputationNodeV0 {
169                id: node.id,
170                digest: node.digest,
171                dependency_ids: node.dependency_ids,
172                dirty: !reasons.is_empty(),
173                reasons,
174            }
175        })
176        .collect::<Vec<_>>();
177
178    propagate_dependency_dirty(&mut nodes, &mut dirty_ids);
179
180    IncrementalComputationPlanV0 {
181        schema_version: "0",
182        product: "omena-incremental.computation-plan",
183        revision: input.revision,
184        node_count: nodes.len(),
185        dirty_node_count: nodes.iter().filter(|node| node.dirty).count(),
186        changed_input_count: nodes
187            .iter()
188            .filter(|node| node.reasons.contains(&"inputDigestChanged"))
189            .count(),
190        new_node_count: nodes
191            .iter()
192            .filter(|node| node.reasons.contains(&"newNode"))
193            .count(),
194        removed_node_count,
195        dependency_dirty_count: nodes
196            .iter()
197            .filter(|node| node.reasons.contains(&"dependencyDirty"))
198            .count(),
199        nodes,
200    }
201}
202
203fn normalized_snapshot_nodes(input: &IncrementalGraphInputV0) -> Vec<IncrementalSnapshotNodeV0> {
204    let mut nodes = input
205        .nodes
206        .iter()
207        .map(|node| IncrementalSnapshotNodeV0 {
208            id: node.id.clone(),
209            digest: node.digest.clone(),
210            dependency_ids: normalized_ids(&node.dependency_ids),
211        })
212        .collect::<Vec<_>>();
213    nodes.sort_by(|left, right| left.id.cmp(&right.id));
214    nodes
215}
216
217fn normalized_ids(ids: &[String]) -> Vec<String> {
218    ids.iter()
219        .cloned()
220        .collect::<BTreeSet<_>>()
221        .into_iter()
222        .collect()
223}
224
225fn propagate_dependency_dirty(
226    nodes: &mut [IncrementalComputationNodeV0],
227    dirty_ids: &mut BTreeSet<String>,
228) {
229    loop {
230        let mut changed = false;
231        for node in nodes.iter_mut() {
232            if node.dirty {
233                continue;
234            }
235            if node
236                .dependency_ids
237                .iter()
238                .any(|dependency_id| dirty_ids.contains(dependency_id))
239            {
240                node.dirty = true;
241                node.reasons.push("dependencyDirty");
242                dirty_ids.insert(node.id.clone());
243                changed = true;
244            }
245        }
246
247        if !changed {
248            break;
249        }
250    }
251}
252
253impl Default for IncrementalCancellationRegistryV0 {
254    fn default() -> Self {
255        Self::with_limit(DEFAULT_INCREMENTAL_CANCELLATION_LIMIT)
256    }
257}
258
259impl IncrementalCancellationRegistryV0 {
260    pub fn with_limit(limit: usize) -> Self {
261        Self {
262            limit: limit.max(1),
263            cancelled_request_ids: BTreeSet::new(),
264        }
265    }
266
267    pub fn cancel(&mut self, request_id: impl Into<String>) {
268        if self.cancelled_request_ids.len() >= self.limit {
269            self.cancelled_request_ids.clear();
270        }
271        self.cancelled_request_ids.insert(request_id.into());
272    }
273
274    pub fn take_cancelled(&mut self, request_id: &str) -> bool {
275        self.cancelled_request_ids.remove(request_id)
276    }
277
278    pub fn len(&self) -> usize {
279        self.cancelled_request_ids.len()
280    }
281
282    pub fn is_empty(&self) -> bool {
283        self.cancelled_request_ids.is_empty()
284    }
285
286    pub fn snapshot(&self) -> IncrementalCancellationSnapshotV0 {
287        IncrementalCancellationSnapshotV0 {
288            schema_version: "0",
289            product: "omena-incremental.cancellation-registry",
290            cancelled_request_count: self.cancelled_request_ids.len(),
291            cancelled_request_ids: self.cancelled_request_ids.iter().cloned().collect(),
292        }
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::{
299        IncrementalCancellationRegistryV0, IncrementalGraphInputV0, IncrementalNodeInputV0,
300        IncrementalRevisionV0, plan_incremental_computation, snapshot_from_graph_input,
301        summarize_omena_incremental_boundary,
302    };
303
304    #[test]
305    fn summarizes_incremental_boundary() {
306        let summary = summarize_omena_incremental_boundary();
307
308        assert_eq!(summary.product, "omena-incremental.boundary");
309        assert!(summary.dirty_reasons.contains(&"dependencyDirty"));
310        assert!(
311            summary
312                .ready_surfaces
313                .contains(&"incrementalCancellationRegistry")
314        );
315    }
316
317    #[test]
318    fn first_plan_marks_all_nodes_dirty() {
319        let input = sample_input("a:v1", "b:v1", 1);
320        let plan = plan_incremental_computation(&input, None);
321
322        assert_eq!(plan.product, "omena-incremental.computation-plan");
323        assert_eq!(plan.node_count, 2);
324        assert_eq!(plan.dirty_node_count, 2);
325        assert_eq!(plan.new_node_count, 2);
326    }
327
328    #[test]
329    fn unchanged_second_plan_marks_nodes_clean() {
330        let input = sample_input("a:v1", "b:v1", 1);
331        let snapshot = snapshot_from_graph_input(&input);
332        let next_input = sample_input("a:v1", "b:v1", 2);
333        let plan = plan_incremental_computation(&next_input, Some(&snapshot));
334
335        assert_eq!(plan.dirty_node_count, 0);
336        assert_eq!(plan.changed_input_count, 0);
337    }
338
339    #[test]
340    fn changed_dependency_marks_dependent_dirty() {
341        let input = sample_input("a:v1", "b:v1", 1);
342        let snapshot = snapshot_from_graph_input(&input);
343        let next_input = sample_input("a:v2", "b:v1", 2);
344        let plan = plan_incremental_computation(&next_input, Some(&snapshot));
345
346        assert_eq!(plan.changed_input_count, 1);
347        assert_eq!(plan.dependency_dirty_count, 1);
348        assert_eq!(node_reasons(&plan, "a"), vec!["inputDigestChanged"]);
349        assert_eq!(node_reasons(&plan, "b"), vec!["dependencyDirty"]);
350    }
351
352    #[test]
353    fn cancellation_registry_tracks_and_consumes_request_ids() {
354        let mut registry = IncrementalCancellationRegistryV0::with_limit(4);
355
356        registry.cancel("s:hover-1");
357
358        assert_eq!(registry.len(), 1);
359        assert!(registry.take_cancelled("s:hover-1"));
360        assert!(!registry.take_cancelled("s:hover-1"));
361        assert!(registry.is_empty());
362    }
363
364    #[test]
365    fn cancellation_registry_bounds_stale_cancelled_requests() {
366        let mut registry = IncrementalCancellationRegistryV0::with_limit(2);
367
368        registry.cancel("n:1");
369        registry.cancel("n:2");
370        registry.cancel("n:3");
371
372        let snapshot = registry.snapshot();
373        assert_eq!(snapshot.product, "omena-incremental.cancellation-registry");
374        assert_eq!(snapshot.cancelled_request_ids, vec!["n:3"]);
375    }
376
377    fn sample_input(a_digest: &str, b_digest: &str, revision: u64) -> IncrementalGraphInputV0 {
378        IncrementalGraphInputV0 {
379            revision: IncrementalRevisionV0 { value: revision },
380            nodes: vec![
381                IncrementalNodeInputV0 {
382                    id: "b".to_string(),
383                    digest: b_digest.to_string(),
384                    dependency_ids: vec!["a".to_string()],
385                },
386                IncrementalNodeInputV0 {
387                    id: "a".to_string(),
388                    digest: a_digest.to_string(),
389                    dependency_ids: Vec::new(),
390                },
391            ],
392        }
393    }
394
395    fn node_reasons(plan: &super::IncrementalComputationPlanV0, id: &str) -> Vec<&'static str> {
396        plan.nodes
397            .iter()
398            .find(|node| node.id == id)
399            .map(|node| node.reasons.clone())
400            .unwrap_or_default()
401    }
402}