1use std::collections::{BTreeMap, BTreeSet};
2
3use serde::Serialize;
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
6#[serde(rename_all = "camelCase")]
7pub struct OmenaIncrementalBoundarySummaryV0 {
8 pub schema_version: &'static str,
9 pub product: &'static str,
10 pub engine_name: &'static str,
11 pub invalidation_model: &'static str,
12 pub node_identity: Vec<&'static str>,
13 pub dirty_reasons: Vec<&'static str>,
14 pub ready_surfaces: Vec<&'static str>,
15}
16
17pub const DEFAULT_INCREMENTAL_CANCELLATION_LIMIT: usize = 128;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
20#[serde(rename_all = "camelCase")]
21pub struct IncrementalRevisionV0 {
22 pub value: u64,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct IncrementalGraphInputV0 {
27 pub revision: IncrementalRevisionV0,
28 pub nodes: Vec<IncrementalNodeInputV0>,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct IncrementalNodeInputV0 {
33 pub id: String,
34 pub digest: String,
35 pub dependency_ids: Vec<String>,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
39#[serde(rename_all = "camelCase")]
40pub struct IncrementalSnapshotV0 {
41 pub schema_version: &'static str,
42 pub product: &'static str,
43 pub revision: IncrementalRevisionV0,
44 pub nodes: Vec<IncrementalSnapshotNodeV0>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
48#[serde(rename_all = "camelCase")]
49pub struct IncrementalSnapshotNodeV0 {
50 pub id: String,
51 pub digest: String,
52 pub dependency_ids: Vec<String>,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
56#[serde(rename_all = "camelCase")]
57pub struct IncrementalComputationPlanV0 {
58 pub schema_version: &'static str,
59 pub product: &'static str,
60 pub revision: IncrementalRevisionV0,
61 pub node_count: usize,
62 pub dirty_node_count: usize,
63 pub changed_input_count: usize,
64 pub new_node_count: usize,
65 pub removed_node_count: usize,
66 pub dependency_dirty_count: usize,
67 pub nodes: Vec<IncrementalComputationNodeV0>,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
71#[serde(rename_all = "camelCase")]
72pub struct IncrementalComputationNodeV0 {
73 pub id: String,
74 pub digest: String,
75 pub dependency_ids: Vec<String>,
76 pub dirty: bool,
77 pub reasons: Vec<&'static str>,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
81#[serde(rename_all = "camelCase")]
82pub struct IncrementalCancellationSnapshotV0 {
83 pub schema_version: &'static str,
84 pub product: &'static str,
85 pub cancelled_request_count: usize,
86 pub cancelled_request_ids: Vec<String>,
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct IncrementalCancellationRegistryV0 {
91 limit: usize,
92 cancelled_request_ids: BTreeSet<String>,
93}
94
95pub fn summarize_omena_incremental_boundary() -> OmenaIncrementalBoundarySummaryV0 {
96 OmenaIncrementalBoundarySummaryV0 {
97 schema_version: "0",
98 product: "omena-incremental.boundary",
99 engine_name: "omena-incremental",
100 invalidation_model: "stableNodeId+inputDigest+dependencyPropagation",
101 node_identity: vec!["id", "digest", "dependencyIds"],
102 dirty_reasons: vec![
103 "newNode",
104 "inputDigestChanged",
105 "dependencySetChanged",
106 "dependencyDirty",
107 ],
108 ready_surfaces: vec![
109 "incrementalGraphInput",
110 "incrementalSnapshot",
111 "incrementalComputationPlan",
112 "incrementalCancellationRegistry",
113 ],
114 }
115}
116
117pub fn snapshot_from_graph_input(input: &IncrementalGraphInputV0) -> IncrementalSnapshotV0 {
118 IncrementalSnapshotV0 {
119 schema_version: "0",
120 product: "omena-incremental.snapshot",
121 revision: input.revision,
122 nodes: normalized_snapshot_nodes(input),
123 }
124}
125
126pub fn plan_incremental_computation(
127 input: &IncrementalGraphInputV0,
128 previous: Option<&IncrementalSnapshotV0>,
129) -> IncrementalComputationPlanV0 {
130 let normalized_nodes = normalized_snapshot_nodes(input);
131 let previous_by_id = previous
132 .map(|snapshot| {
133 snapshot
134 .nodes
135 .iter()
136 .map(|node| (node.id.as_str(), node))
137 .collect::<BTreeMap<_, _>>()
138 })
139 .unwrap_or_default();
140 let current_ids = normalized_nodes
141 .iter()
142 .map(|node| node.id.as_str())
143 .collect::<BTreeSet<_>>();
144 let removed_node_count = previous_by_id
145 .keys()
146 .filter(|id| !current_ids.contains(**id))
147 .count();
148 let mut dirty_ids = BTreeSet::<String>::new();
149 let mut nodes = normalized_nodes
150 .into_iter()
151 .map(|node| {
152 let mut reasons = Vec::new();
153 match previous_by_id.get(node.id.as_str()) {
154 None => reasons.push("newNode"),
155 Some(previous_node) => {
156 if previous_node.digest != node.digest {
157 reasons.push("inputDigestChanged");
158 }
159 if previous_node.dependency_ids != node.dependency_ids {
160 reasons.push("dependencySetChanged");
161 }
162 }
163 }
164 if !reasons.is_empty() {
165 dirty_ids.insert(node.id.clone());
166 }
167
168 IncrementalComputationNodeV0 {
169 id: node.id,
170 digest: node.digest,
171 dependency_ids: node.dependency_ids,
172 dirty: !reasons.is_empty(),
173 reasons,
174 }
175 })
176 .collect::<Vec<_>>();
177
178 propagate_dependency_dirty(&mut nodes, &mut dirty_ids);
179
180 IncrementalComputationPlanV0 {
181 schema_version: "0",
182 product: "omena-incremental.computation-plan",
183 revision: input.revision,
184 node_count: nodes.len(),
185 dirty_node_count: nodes.iter().filter(|node| node.dirty).count(),
186 changed_input_count: nodes
187 .iter()
188 .filter(|node| node.reasons.contains(&"inputDigestChanged"))
189 .count(),
190 new_node_count: nodes
191 .iter()
192 .filter(|node| node.reasons.contains(&"newNode"))
193 .count(),
194 removed_node_count,
195 dependency_dirty_count: nodes
196 .iter()
197 .filter(|node| node.reasons.contains(&"dependencyDirty"))
198 .count(),
199 nodes,
200 }
201}
202
203fn normalized_snapshot_nodes(input: &IncrementalGraphInputV0) -> Vec<IncrementalSnapshotNodeV0> {
204 let mut nodes = input
205 .nodes
206 .iter()
207 .map(|node| IncrementalSnapshotNodeV0 {
208 id: node.id.clone(),
209 digest: node.digest.clone(),
210 dependency_ids: normalized_ids(&node.dependency_ids),
211 })
212 .collect::<Vec<_>>();
213 nodes.sort_by(|left, right| left.id.cmp(&right.id));
214 nodes
215}
216
217fn normalized_ids(ids: &[String]) -> Vec<String> {
218 ids.iter()
219 .cloned()
220 .collect::<BTreeSet<_>>()
221 .into_iter()
222 .collect()
223}
224
225fn propagate_dependency_dirty(
226 nodes: &mut [IncrementalComputationNodeV0],
227 dirty_ids: &mut BTreeSet<String>,
228) {
229 loop {
230 let mut changed = false;
231 for node in nodes.iter_mut() {
232 if node.dirty {
233 continue;
234 }
235 if node
236 .dependency_ids
237 .iter()
238 .any(|dependency_id| dirty_ids.contains(dependency_id))
239 {
240 node.dirty = true;
241 node.reasons.push("dependencyDirty");
242 dirty_ids.insert(node.id.clone());
243 changed = true;
244 }
245 }
246
247 if !changed {
248 break;
249 }
250 }
251}
252
253impl Default for IncrementalCancellationRegistryV0 {
254 fn default() -> Self {
255 Self::with_limit(DEFAULT_INCREMENTAL_CANCELLATION_LIMIT)
256 }
257}
258
259impl IncrementalCancellationRegistryV0 {
260 pub fn with_limit(limit: usize) -> Self {
261 Self {
262 limit: limit.max(1),
263 cancelled_request_ids: BTreeSet::new(),
264 }
265 }
266
267 pub fn cancel(&mut self, request_id: impl Into<String>) {
268 if self.cancelled_request_ids.len() >= self.limit {
269 self.cancelled_request_ids.clear();
270 }
271 self.cancelled_request_ids.insert(request_id.into());
272 }
273
274 pub fn take_cancelled(&mut self, request_id: &str) -> bool {
275 self.cancelled_request_ids.remove(request_id)
276 }
277
278 pub fn len(&self) -> usize {
279 self.cancelled_request_ids.len()
280 }
281
282 pub fn is_empty(&self) -> bool {
283 self.cancelled_request_ids.is_empty()
284 }
285
286 pub fn snapshot(&self) -> IncrementalCancellationSnapshotV0 {
287 IncrementalCancellationSnapshotV0 {
288 schema_version: "0",
289 product: "omena-incremental.cancellation-registry",
290 cancelled_request_count: self.cancelled_request_ids.len(),
291 cancelled_request_ids: self.cancelled_request_ids.iter().cloned().collect(),
292 }
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::{
299 IncrementalCancellationRegistryV0, IncrementalGraphInputV0, IncrementalNodeInputV0,
300 IncrementalRevisionV0, plan_incremental_computation, snapshot_from_graph_input,
301 summarize_omena_incremental_boundary,
302 };
303
304 #[test]
305 fn summarizes_incremental_boundary() {
306 let summary = summarize_omena_incremental_boundary();
307
308 assert_eq!(summary.product, "omena-incremental.boundary");
309 assert!(summary.dirty_reasons.contains(&"dependencyDirty"));
310 assert!(
311 summary
312 .ready_surfaces
313 .contains(&"incrementalCancellationRegistry")
314 );
315 }
316
317 #[test]
318 fn first_plan_marks_all_nodes_dirty() {
319 let input = sample_input("a:v1", "b:v1", 1);
320 let plan = plan_incremental_computation(&input, None);
321
322 assert_eq!(plan.product, "omena-incremental.computation-plan");
323 assert_eq!(plan.node_count, 2);
324 assert_eq!(plan.dirty_node_count, 2);
325 assert_eq!(plan.new_node_count, 2);
326 }
327
328 #[test]
329 fn unchanged_second_plan_marks_nodes_clean() {
330 let input = sample_input("a:v1", "b:v1", 1);
331 let snapshot = snapshot_from_graph_input(&input);
332 let next_input = sample_input("a:v1", "b:v1", 2);
333 let plan = plan_incremental_computation(&next_input, Some(&snapshot));
334
335 assert_eq!(plan.dirty_node_count, 0);
336 assert_eq!(plan.changed_input_count, 0);
337 }
338
339 #[test]
340 fn changed_dependency_marks_dependent_dirty() {
341 let input = sample_input("a:v1", "b:v1", 1);
342 let snapshot = snapshot_from_graph_input(&input);
343 let next_input = sample_input("a:v2", "b:v1", 2);
344 let plan = plan_incremental_computation(&next_input, Some(&snapshot));
345
346 assert_eq!(plan.changed_input_count, 1);
347 assert_eq!(plan.dependency_dirty_count, 1);
348 assert_eq!(node_reasons(&plan, "a"), vec!["inputDigestChanged"]);
349 assert_eq!(node_reasons(&plan, "b"), vec!["dependencyDirty"]);
350 }
351
352 #[test]
353 fn cancellation_registry_tracks_and_consumes_request_ids() {
354 let mut registry = IncrementalCancellationRegistryV0::with_limit(4);
355
356 registry.cancel("s:hover-1");
357
358 assert_eq!(registry.len(), 1);
359 assert!(registry.take_cancelled("s:hover-1"));
360 assert!(!registry.take_cancelled("s:hover-1"));
361 assert!(registry.is_empty());
362 }
363
364 #[test]
365 fn cancellation_registry_bounds_stale_cancelled_requests() {
366 let mut registry = IncrementalCancellationRegistryV0::with_limit(2);
367
368 registry.cancel("n:1");
369 registry.cancel("n:2");
370 registry.cancel("n:3");
371
372 let snapshot = registry.snapshot();
373 assert_eq!(snapshot.product, "omena-incremental.cancellation-registry");
374 assert_eq!(snapshot.cancelled_request_ids, vec!["n:3"]);
375 }
376
377 fn sample_input(a_digest: &str, b_digest: &str, revision: u64) -> IncrementalGraphInputV0 {
378 IncrementalGraphInputV0 {
379 revision: IncrementalRevisionV0 { value: revision },
380 nodes: vec![
381 IncrementalNodeInputV0 {
382 id: "b".to_string(),
383 digest: b_digest.to_string(),
384 dependency_ids: vec!["a".to_string()],
385 },
386 IncrementalNodeInputV0 {
387 id: "a".to_string(),
388 digest: a_digest.to_string(),
389 dependency_ids: Vec::new(),
390 },
391 ],
392 }
393 }
394
395 fn node_reasons(plan: &super::IncrementalComputationPlanV0, id: &str) -> Vec<&'static str> {
396 plan.nodes
397 .iter()
398 .find(|node| node.id == id)
399 .map(|node| node.reasons.clone())
400 .unwrap_or_default()
401 }
402}