1use std::collections::{BTreeMap, BTreeSet};
9
10use salsa::Setter;
11use serde::Serialize;
12
13mod frame_invalidation;
14pub use frame_invalidation::*;
15
16#[cfg(test)]
17use std::sync::atomic::{AtomicUsize, Ordering};
18
19#[cfg(test)]
20static SALSA_DIGEST_QUERY_RUNS: AtomicUsize = AtomicUsize::new(0);
21#[cfg(test)]
22static SALSA_DEPENDENCY_QUERY_RUNS: AtomicUsize = AtomicUsize::new(0);
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
25#[serde(rename_all = "camelCase")]
26pub struct OmenaIncrementalBoundarySummaryV0 {
27 pub schema_version: &'static str,
28 pub product: &'static str,
29 pub engine_name: &'static str,
30 pub invalidation_model: &'static str,
31 pub query_model: &'static str,
32 pub dependency_propagation_policy: &'static str,
33 pub maximum_dependency_propagation_iterations: &'static str,
34 pub node_identity: Vec<&'static str>,
35 pub dirty_reasons: Vec<&'static str>,
36 pub ready_surfaces: Vec<&'static str>,
37}
38
39pub const DEFAULT_INCREMENTAL_CANCELLATION_LIMIT: usize = 128;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
42#[serde(rename_all = "camelCase")]
43pub struct IncrementalRevisionV0 {
44 pub value: u64,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct IncrementalGraphInputV0 {
49 pub revision: IncrementalRevisionV0,
50 pub nodes: Vec<IncrementalNodeInputV0>,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct IncrementalNodeInputV0 {
55 pub id: String,
56 pub digest: String,
57 pub dependency_ids: Vec<String>,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
61#[serde(rename_all = "camelCase")]
62pub struct IncrementalSnapshotV0 {
63 pub schema_version: &'static str,
64 pub product: &'static str,
65 pub revision: IncrementalRevisionV0,
66 pub nodes: Vec<IncrementalSnapshotNodeV0>,
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
70#[serde(rename_all = "camelCase")]
71pub struct IncrementalSnapshotNodeV0 {
72 pub id: String,
73 pub digest: String,
74 pub dependency_ids: Vec<String>,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
78#[serde(rename_all = "camelCase")]
79pub struct IncrementalComputationPlanV0 {
80 pub schema_version: &'static str,
81 pub product: &'static str,
82 pub revision: IncrementalRevisionV0,
83 pub node_count: usize,
84 pub dirty_node_count: usize,
85 pub changed_input_count: usize,
86 pub new_node_count: usize,
87 pub removed_node_count: usize,
88 pub dependency_dirty_count: usize,
89 pub nodes: Vec<IncrementalComputationNodeV0>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
93#[serde(rename_all = "camelCase")]
94pub struct IncrementalComputationNodeV0 {
95 pub id: String,
96 pub digest: String,
97 pub dependency_ids: Vec<String>,
98 pub dirty: bool,
99 pub reasons: Vec<&'static str>,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
103#[serde(rename_all = "camelCase")]
104pub struct IncrementalCancellationSnapshotV0 {
105 pub schema_version: &'static str,
106 pub product: &'static str,
107 pub cancelled_request_count: usize,
108 pub cancelled_request_ids: Vec<String>,
109}
110
111#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
112#[serde(rename_all = "camelCase")]
113pub struct IncrementalDatabaseUpdateV0 {
114 pub schema_version: &'static str,
115 pub product: &'static str,
116 pub incremental_plan: IncrementalComputationPlanV0,
117 pub datalog_rule_evaluator: DatalogRuleEvaluatorV0,
118 pub next_snapshot: IncrementalSnapshotV0,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
122#[serde(rename_all = "camelCase")]
123pub struct IncrementalConsistencyFuzzCaseV0 {
124 pub seed: u64,
125 pub node_count: usize,
126 pub changed_node_index: Option<usize>,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
130#[serde(rename_all = "camelCase")]
131pub struct IncrementalConsistencyFuzzResultV0 {
132 pub seed: u64,
133 pub node_count: usize,
134 pub changed_node_id: Option<String>,
135 pub dirty_node_count: usize,
136 pub expected_dirty_node_count: usize,
137 pub passed: bool,
138}
139
140#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
141#[serde(rename_all = "camelCase")]
142pub struct IncrementalFuzzSeedReportV0 {
143 pub schema_version: &'static str,
144 pub product: &'static str,
145 pub case_count: usize,
146 pub passed_count: usize,
147 pub failed_count: usize,
148 pub results: Vec<IncrementalConsistencyFuzzResultV0>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
152#[serde(rename_all = "camelCase")]
153pub struct DatalogRuleEvaluatorRuleV0 {
154 pub name: &'static str,
155 pub head: &'static str,
156 pub body: Vec<&'static str>,
157 pub source: &'static str,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
161#[serde(rename_all = "camelCase")]
162pub struct DatalogRuleEvaluatorV0 {
168 pub schema_version: &'static str,
169 pub product: &'static str,
170 pub evaluator_kind: &'static str,
171 pub substrate: &'static str,
172 pub external_host_ready: bool,
173 pub revision: IncrementalRevisionV0,
174 pub rule_count: usize,
175 pub relation_count: usize,
176 pub input_node_count: usize,
177 pub dirty_node_count: usize,
178 pub derived_node_count: usize,
179 pub iteration_limit: usize,
180 pub fixed_point_reached: bool,
181 pub relations: Vec<&'static str>,
182 pub rules: Vec<DatalogRuleEvaluatorRuleV0>,
183 pub incremental_plan: IncrementalComputationPlanV0,
184}
185
186#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
187#[serde(rename_all = "camelCase")]
188pub struct IncrementalLayerEvidenceV0 {
189 pub schema_version: &'static str,
190 pub product: &'static str,
191 pub claim_level: &'static str,
192 pub invalidation_layer: &'static str,
193 pub real_invalidation_evidence_ready: bool,
194 pub fuzz_evidence_ready: bool,
195 pub salsa_reuse_evidence_ready: bool,
196 pub datalog_contract_evidence_ready: bool,
197 pub benchmark_surface_ready: bool,
198 pub performance_benchmark_claim_ready: bool,
199 pub external_datalog_host_ready: bool,
200 pub dbsp_zset_claim_ready: bool,
201 pub public_safety_claim_ready: bool,
202 pub benchmark_gate: &'static str,
203 pub benchmark_evidence_level: &'static str,
204 pub supported_claims: Vec<&'static str>,
205 pub deferred_claims: Vec<&'static str>,
206 pub boundary: OmenaIncrementalBoundarySummaryV0,
207 pub fuzz_report: IncrementalFuzzSeedReportV0,
208 pub sample_update: IncrementalDatabaseUpdateV0,
209}
210
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct IncrementalCancellationRegistryV0 {
213 limit: usize,
214 cancelled_request_ids: BTreeSet<String>,
215}
216
217#[salsa::input(debug)]
218pub struct SalsaIncrementalNodeInputV0 {
219 #[returns(ref)]
220 id: String,
221 #[returns(ref)]
222 digest: String,
223 #[returns(ref)]
224 dependency_ids: Vec<String>,
225}
226
227#[derive(Default)]
228pub struct OmenaIncrementalDatabaseV0 {
229 db: salsa::DatabaseImpl,
230 node_inputs_by_id: BTreeMap<String, SalsaIncrementalNodeInputV0>,
231 current_snapshot: Option<IncrementalSnapshotV0>,
232}
233
234pub fn summarize_omena_incremental_boundary() -> OmenaIncrementalBoundarySummaryV0 {
235 OmenaIncrementalBoundarySummaryV0 {
236 schema_version: "0",
237 product: "omena-incremental.boundary",
238 engine_name: "omena-incremental",
239 invalidation_model: "stableNodeId+inputDigest+dependencyPropagation",
240 query_model: "salsaInput+trackedQueryFieldGranularReuse",
241 dependency_propagation_policy: "boundedFixedPointOverNormalizedNodeSet",
242 maximum_dependency_propagation_iterations: "nodeCount+1",
243 node_identity: vec!["id", "digest", "dependencyIds"],
244 dirty_reasons: vec![
245 "newNode",
246 "inputDigestChanged",
247 "dependencySetChanged",
248 "dependencyDirty",
249 ],
250 ready_surfaces: vec![
251 "incrementalGraphInput",
252 "incrementalSnapshot",
253 "incrementalComputationPlan",
254 "incrementalCancellationRegistry",
255 "datalogRuleEvaluatorV0",
256 "salsaPersistentDatabase",
257 "salsaTrackedNodeSnapshotQuery",
258 "salsaFieldGranularReuse",
259 "salsaPlanAndSnapshotUpdate",
260 "dependencyFixedPointEarlyExit",
261 ],
262 }
263}
264
265pub fn summarize_datalog_rule_evaluator_v0(
266 input: &IncrementalGraphInputV0,
267 previous: Option<&IncrementalSnapshotV0>,
268) -> DatalogRuleEvaluatorV0 {
269 let incremental_plan = plan_incremental_computation(input, previous);
270 let relations = vec![
271 "node(id,digest)",
272 "previousNode(id,digest)",
273 "dependsOn(nodeId,dependencyId)",
274 "changedInput(nodeId)",
275 "dirty(nodeId)",
276 ];
277 let rules = vec![
278 DatalogRuleEvaluatorRuleV0 {
279 name: "newNodeIsDirty",
280 head: "dirty(Node)",
281 body: vec!["node(Node,Digest)", "not previousNode(Node,_)"],
282 source: "omena-incremental.computation-plan",
283 },
284 DatalogRuleEvaluatorRuleV0 {
285 name: "changedDigestIsDirty",
286 head: "dirty(Node)",
287 body: vec![
288 "node(Node,Digest)",
289 "previousNode(Node,PreviousDigest)",
290 "Digest != PreviousDigest",
291 ],
292 source: "omena-incremental.computation-plan",
293 },
294 DatalogRuleEvaluatorRuleV0 {
295 name: "changedDependencySetIsDirty",
296 head: "dirty(Node)",
297 body: vec!["dependsOn(Node,_)", "previousDependencySetDiffers(Node)"],
298 source: "omena-incremental.computation-plan",
299 },
300 DatalogRuleEvaluatorRuleV0 {
301 name: "dependencyDirtyFixedPoint",
302 head: "dirty(Node)",
303 body: vec!["dependsOn(Node,Dependency)", "dirty(Dependency)"],
304 source: "omena-incremental.dependency-propagation",
305 },
306 ];
307 let iteration_limit = dependency_propagation_iteration_limit(input.nodes.len());
308 let fixed_point_reached = dirty_set_is_dependency_closed(&incremental_plan);
309
310 DatalogRuleEvaluatorV0 {
311 schema_version: "0",
312 product: "omena-incremental.datalog-rule-evaluator",
313 evaluator_kind: "typedContractOverIncrementalFixedPoint",
314 substrate: "omena-incremental.salsa-backed-computation-plan",
315 external_host_ready: false,
316 revision: input.revision,
317 rule_count: rules.len(),
318 relation_count: relations.len(),
319 input_node_count: input.nodes.len(),
320 dirty_node_count: incremental_plan.dirty_node_count,
321 derived_node_count: incremental_plan.dependency_dirty_count,
322 iteration_limit,
323 fixed_point_reached,
324 relations,
325 rules,
326 incremental_plan,
327 }
328}
329
330pub fn snapshot_from_graph_input(input: &IncrementalGraphInputV0) -> IncrementalSnapshotV0 {
331 IncrementalSnapshotV0 {
332 schema_version: "0",
333 product: "omena-incremental.snapshot",
334 revision: input.revision,
335 nodes: normalized_snapshot_nodes(input),
336 }
337}
338
339pub fn plan_incremental_computation(
340 input: &IncrementalGraphInputV0,
341 previous: Option<&IncrementalSnapshotV0>,
342) -> IncrementalComputationPlanV0 {
343 let normalized_nodes = normalized_snapshot_nodes(input);
344 let previous_by_id = previous
345 .map(|snapshot| {
346 snapshot
347 .nodes
348 .iter()
349 .map(|node| (node.id.as_str(), node))
350 .collect::<BTreeMap<_, _>>()
351 })
352 .unwrap_or_default();
353 let current_ids = normalized_nodes
354 .iter()
355 .map(|node| node.id.as_str())
356 .collect::<BTreeSet<_>>();
357 let removed_node_count = previous_by_id
358 .keys()
359 .filter(|id| !current_ids.contains(**id))
360 .count();
361 let mut dirty_ids = BTreeSet::<String>::new();
362 let mut nodes = normalized_nodes
363 .into_iter()
364 .map(|node| {
365 let mut reasons = Vec::new();
366 match previous_by_id.get(node.id.as_str()) {
367 None => reasons.push("newNode"),
368 Some(previous_node) => {
369 if previous_node.digest != node.digest {
370 reasons.push("inputDigestChanged");
371 }
372 if previous_node.dependency_ids != node.dependency_ids {
373 reasons.push("dependencySetChanged");
374 }
375 }
376 }
377 if !reasons.is_empty() {
378 dirty_ids.insert(node.id.clone());
379 }
380
381 IncrementalComputationNodeV0 {
382 id: node.id,
383 digest: node.digest,
384 dependency_ids: node.dependency_ids,
385 dirty: !reasons.is_empty(),
386 reasons,
387 }
388 })
389 .collect::<Vec<_>>();
390
391 propagate_dependency_dirty(&mut nodes, &mut dirty_ids);
392
393 IncrementalComputationPlanV0 {
394 schema_version: "0",
395 product: "omena-incremental.computation-plan",
396 revision: input.revision,
397 node_count: nodes.len(),
398 dirty_node_count: nodes.iter().filter(|node| node.dirty).count(),
399 changed_input_count: nodes
400 .iter()
401 .filter(|node| node.reasons.contains(&"inputDigestChanged"))
402 .count(),
403 new_node_count: nodes
404 .iter()
405 .filter(|node| node.reasons.contains(&"newNode"))
406 .count(),
407 removed_node_count,
408 dependency_dirty_count: nodes
409 .iter()
410 .filter(|node| node.reasons.contains(&"dependencyDirty"))
411 .count(),
412 nodes,
413 }
414}
415
416pub fn run_incremental_consistency_fuzz_case(
417 case: IncrementalConsistencyFuzzCaseV0,
418) -> IncrementalConsistencyFuzzResultV0 {
419 let node_count = case.node_count.clamp(1, 64);
420 let previous_input = generated_incremental_fuzz_graph(case.seed, node_count, None);
421 let previous_snapshot = snapshot_from_graph_input(&previous_input);
422 let changed_index = case
423 .changed_node_index
424 .map(|index| index.min(node_count.saturating_sub(1)));
425 let next_input = generated_incremental_fuzz_graph(case.seed, node_count, changed_index);
426 let plan = plan_incremental_computation(&next_input, Some(&previous_snapshot));
427 let changed_node_id = changed_index.map(fuzz_node_id);
428 let expected_dirty_ids = changed_node_id
429 .as_ref()
430 .map(|changed_id| transitive_dependents(&next_input, changed_id))
431 .unwrap_or_default();
432 let actual_dirty_ids = plan
433 .nodes
434 .iter()
435 .filter(|node| node.dirty)
436 .map(|node| node.id.clone())
437 .collect::<BTreeSet<_>>();
438 let expected_dirty_node_count = expected_dirty_ids.len();
439 let passed = actual_dirty_ids == expected_dirty_ids
440 && plan.dirty_node_count == expected_dirty_node_count
441 && plan.changed_input_count == usize::from(changed_node_id.is_some());
442
443 IncrementalConsistencyFuzzResultV0 {
444 seed: case.seed,
445 node_count,
446 changed_node_id,
447 dirty_node_count: plan.dirty_node_count,
448 expected_dirty_node_count,
449 passed,
450 }
451}
452
453pub fn run_incremental_fuzz_seed_corpus() -> IncrementalFuzzSeedReportV0 {
454 let seeds = [1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233];
455 let results = seeds
456 .into_iter()
457 .enumerate()
458 .map(|(index, seed)| {
459 run_incremental_consistency_fuzz_case(IncrementalConsistencyFuzzCaseV0 {
460 seed,
461 node_count: index + 1,
462 changed_node_index: if index % 4 == 0 {
463 None
464 } else {
465 Some(index / 2)
466 },
467 })
468 })
469 .collect::<Vec<_>>();
470 let passed_count = results.iter().filter(|result| result.passed).count();
471 let case_count = results.len();
472
473 IncrementalFuzzSeedReportV0 {
474 schema_version: "0",
475 product: "omena-incremental.fuzz-seed-corpus",
476 case_count,
477 passed_count,
478 failed_count: case_count - passed_count,
479 results,
480 }
481}
482
483pub fn summarize_incremental_layer_evidence_v0() -> IncrementalLayerEvidenceV0 {
484 let boundary = summarize_omena_incremental_boundary();
485 let fuzz_report = run_incremental_fuzz_seed_corpus();
486 let mut database = OmenaIncrementalDatabaseV0::default();
487 let previous_input = IncrementalGraphInputV0 {
488 revision: IncrementalRevisionV0 { value: 1 },
489 nodes: vec![
490 IncrementalNodeInputV0 {
491 id: "source".to_string(),
492 digest: "source:v1".to_string(),
493 dependency_ids: Vec::new(),
494 },
495 IncrementalNodeInputV0 {
496 id: "style".to_string(),
497 digest: "style:v1".to_string(),
498 dependency_ids: vec!["source".to_string()],
499 },
500 ],
501 };
502 database.plan_and_upsert_graph_input(&previous_input);
503 let sample_update = database.plan_and_upsert_graph_input(&IncrementalGraphInputV0 {
504 revision: IncrementalRevisionV0 { value: 2 },
505 nodes: vec![
506 IncrementalNodeInputV0 {
507 id: "source".to_string(),
508 digest: "source:v2".to_string(),
509 dependency_ids: Vec::new(),
510 },
511 IncrementalNodeInputV0 {
512 id: "style".to_string(),
513 digest: "style:v1".to_string(),
514 dependency_ids: vec!["source".to_string()],
515 },
516 ],
517 });
518
519 IncrementalLayerEvidenceV0 {
520 schema_version: "0",
521 product: "omena-incremental.layer-evidence",
522 claim_level: "m6IncrementalLayerEvidenceOnly",
523 invalidation_layer: "stableNodeIdDigestDependencyGraph",
524 real_invalidation_evidence_ready: sample_update.incremental_plan.changed_input_count == 1
525 && sample_update.incremental_plan.dependency_dirty_count == 1
526 && sample_update.incremental_plan.dirty_node_count == 2,
527 fuzz_evidence_ready: fuzz_report.failed_count == 0,
528 salsa_reuse_evidence_ready: boundary
529 .ready_surfaces
530 .contains(&"salsaTrackedNodeSnapshotQuery"),
531 datalog_contract_evidence_ready: sample_update.datalog_rule_evaluator.fixed_point_reached
532 && !sample_update.datalog_rule_evaluator.external_host_ready,
533 benchmark_surface_ready: true,
534 performance_benchmark_claim_ready: false,
535 external_datalog_host_ready: false,
536 dbsp_zset_claim_ready: false,
537 public_safety_claim_ready: false,
538 benchmark_gate: "rust/z5-performance-baseline-readiness",
539 benchmark_evidence_level: "configuredCriterionSurfaceNoTimingClaim",
540 supported_claims: vec![
541 "stable node id plus digest invalidation",
542 "dependency dirty-set fixed point",
543 "Salsa-backed tracked node snapshot reuse",
544 "fuzzed dirty-set invariant corpus",
545 "Datalog-shaped audit contract over the incremental plan",
546 ],
547 deferred_claims: vec![
548 "DBSP runtime",
549 "Z-set differential dataflow semantics",
550 "external Datalog host execution",
551 "performance superiority from local timing data",
552 "public safety claim",
553 ],
554 boundary,
555 fuzz_report,
556 sample_update,
557 }
558}
559
560#[salsa::tracked(returns(clone))]
561pub fn summarize_salsa_incremental_node_snapshot(
562 db: &dyn salsa::Database,
563 node: SalsaIncrementalNodeInputV0,
564) -> IncrementalSnapshotNodeV0 {
565 IncrementalSnapshotNodeV0 {
566 id: node.id(db).clone(),
567 digest: node.digest(db).clone(),
568 dependency_ids: normalized_ids(node.dependency_ids(db)),
569 }
570}
571
572#[salsa::tracked(returns(clone))]
573pub fn read_salsa_incremental_node_digest(
574 db: &dyn salsa::Database,
575 node: SalsaIncrementalNodeInputV0,
576) -> String {
577 #[cfg(test)]
578 SALSA_DIGEST_QUERY_RUNS.fetch_add(1, Ordering::Relaxed);
579
580 node.digest(db).clone()
581}
582
583#[salsa::tracked(returns(clone))]
584pub fn read_salsa_incremental_node_dependency_ids(
585 db: &dyn salsa::Database,
586 node: SalsaIncrementalNodeInputV0,
587) -> Vec<String> {
588 #[cfg(test)]
589 SALSA_DEPENDENCY_QUERY_RUNS.fetch_add(1, Ordering::Relaxed);
590
591 normalized_ids(node.dependency_ids(db))
592}
593
594fn normalized_snapshot_nodes(input: &IncrementalGraphInputV0) -> Vec<IncrementalSnapshotNodeV0> {
595 let mut nodes = input
596 .nodes
597 .iter()
598 .map(|node| IncrementalSnapshotNodeV0 {
599 id: node.id.clone(),
600 digest: node.digest.clone(),
601 dependency_ids: normalized_ids(&node.dependency_ids),
602 })
603 .collect::<Vec<_>>();
604 nodes.sort_by(|left, right| left.id.cmp(&right.id));
605 nodes
606}
607
608fn normalized_ids(ids: &[String]) -> Vec<String> {
609 ids.iter()
610 .cloned()
611 .collect::<BTreeSet<_>>()
612 .into_iter()
613 .collect()
614}
615
616fn propagate_dependency_dirty(
617 nodes: &mut [IncrementalComputationNodeV0],
618 dirty_ids: &mut BTreeSet<String>,
619) {
620 let max_iterations = dependency_propagation_iteration_limit(nodes.len());
621 for _ in 0..max_iterations {
622 let mut changed = false;
623 for node in nodes.iter_mut() {
624 if node.dirty {
625 continue;
626 }
627 if node
628 .dependency_ids
629 .iter()
630 .any(|dependency_id| dirty_ids.contains(dependency_id))
631 {
632 node.dirty = true;
633 node.reasons.push("dependencyDirty");
634 dirty_ids.insert(node.id.clone());
635 changed = true;
636 }
637 }
638
639 if !changed {
640 break;
641 }
642 }
643}
644
645fn dirty_set_is_dependency_closed(plan: &IncrementalComputationPlanV0) -> bool {
646 let dirty_ids = plan
647 .nodes
648 .iter()
649 .filter(|node| node.dirty)
650 .map(|node| node.id.as_str())
651 .collect::<BTreeSet<_>>();
652 plan.nodes.iter().all(|node| {
653 node.dirty
654 || node
655 .dependency_ids
656 .iter()
657 .all(|dependency_id| !dirty_ids.contains(dependency_id.as_str()))
658 })
659}
660
661fn generated_incremental_fuzz_graph(
662 seed: u64,
663 node_count: usize,
664 changed_index: Option<usize>,
665) -> IncrementalGraphInputV0 {
666 let mut state = seed ^ 0xa076_1d64_78bd_642f;
667 let nodes = (0..node_count)
668 .map(|index| {
669 let id = fuzz_node_id(index);
670 let mut digest_seed = fuzz_next(&mut state);
671 if changed_index == Some(index) {
672 digest_seed ^= 0xffff_ffff_ffff_ffff;
673 }
674 let dependency_ids = (0..index)
675 .filter(|candidate| {
676 let divisor = ((*candidate + 2) as u64).max(2);
677 (seed + index as u64).is_multiple_of(divisor)
678 })
679 .map(fuzz_node_id)
680 .collect::<Vec<_>>();
681 IncrementalNodeInputV0 {
682 id,
683 digest: format!("digest-{index}-{digest_seed:016x}"),
684 dependency_ids,
685 }
686 })
687 .collect();
688
689 IncrementalGraphInputV0 {
690 revision: IncrementalRevisionV0 {
691 value: 1 + if changed_index.is_some() { 1 } else { 0 },
692 },
693 nodes,
694 }
695}
696
697fn transitive_dependents(input: &IncrementalGraphInputV0, changed_id: &str) -> BTreeSet<String> {
698 let mut dirty_ids = BTreeSet::from([changed_id.to_string()]);
699 let max_iterations = dependency_propagation_iteration_limit(input.nodes.len());
700 for _ in 0..max_iterations {
701 let mut changed = false;
702 for node in &input.nodes {
703 if dirty_ids.contains(&node.id) {
704 continue;
705 }
706 if node
707 .dependency_ids
708 .iter()
709 .any(|dependency_id| dirty_ids.contains(dependency_id))
710 {
711 changed = dirty_ids.insert(node.id.clone()) || changed;
712 }
713 }
714 if !changed {
715 break;
716 }
717 }
718 dirty_ids
719}
720
721fn dependency_propagation_iteration_limit(node_count: usize) -> usize {
722 node_count.saturating_add(1)
723}
724
725fn fuzz_node_id(index: usize) -> String {
726 format!("node-{index}")
727}
728
729fn fuzz_next(state: &mut u64) -> u64 {
730 *state = state
731 .wrapping_mul(6_364_136_223_846_793_005)
732 .wrapping_add(1_442_695_040_888_963_407);
733 *state
734}
735
736impl OmenaIncrementalDatabaseV0 {
737 pub fn salsa_database(&self) -> &salsa::DatabaseImpl {
738 &self.db
739 }
740
741 pub fn node_input(&self, id: &str) -> Option<SalsaIncrementalNodeInputV0> {
742 self.node_inputs_by_id.get(id).copied()
743 }
744
745 pub fn current_snapshot(&self) -> Option<&IncrementalSnapshotV0> {
746 self.current_snapshot.as_ref()
747 }
748
749 pub fn plan_and_upsert_graph_input(
750 &mut self,
751 input: &IncrementalGraphInputV0,
752 ) -> IncrementalDatabaseUpdateV0 {
753 let incremental_plan = plan_incremental_computation(input, self.current_snapshot.as_ref());
754 let datalog_rule_evaluator =
755 summarize_datalog_rule_evaluator_v0(input, self.current_snapshot.as_ref());
756 let next_snapshot = self.upsert_graph_input(input);
757 self.current_snapshot = Some(next_snapshot.clone());
758
759 IncrementalDatabaseUpdateV0 {
760 schema_version: "0",
761 product: "omena-incremental.salsa-database-update",
762 incremental_plan,
763 datalog_rule_evaluator,
764 next_snapshot,
765 }
766 }
767
768 pub fn upsert_graph_input(&mut self, input: &IncrementalGraphInputV0) -> IncrementalSnapshotV0 {
769 let normalized_nodes = normalized_snapshot_nodes(input);
770 let current_ids = normalized_nodes
771 .iter()
772 .map(|node| node.id.as_str())
773 .collect::<BTreeSet<_>>();
774 self.node_inputs_by_id
775 .retain(|id, _node| current_ids.contains(id.as_str()));
776
777 for node in &normalized_nodes {
778 self.upsert_node_input(node);
779 }
780
781 let nodes = self
782 .node_inputs_by_id
783 .values()
784 .copied()
785 .map(|node| summarize_salsa_incremental_node_snapshot(&self.db, node))
786 .collect::<Vec<_>>();
787
788 IncrementalSnapshotV0 {
789 schema_version: "0",
790 product: "omena-incremental.salsa-snapshot",
791 revision: input.revision,
792 nodes,
793 }
794 }
795
796 fn upsert_node_input(&mut self, node: &IncrementalSnapshotNodeV0) {
797 let Some(node_input) = self.node_inputs_by_id.get(node.id.as_str()).copied() else {
798 let node_input = SalsaIncrementalNodeInputV0::new(
799 &self.db,
800 node.id.clone(),
801 node.digest.clone(),
802 node.dependency_ids.clone(),
803 );
804 self.node_inputs_by_id.insert(node.id.clone(), node_input);
805 return;
806 };
807
808 if node_input.digest(&self.db).as_str() != node.digest.as_str() {
809 node_input.set_digest(&mut self.db).to(node.digest.clone());
810 }
811 if node_input.dependency_ids(&self.db).as_slice() != node.dependency_ids.as_slice() {
812 node_input
813 .set_dependency_ids(&mut self.db)
814 .to(node.dependency_ids.clone());
815 }
816 }
817}
818
819impl Default for IncrementalCancellationRegistryV0 {
820 fn default() -> Self {
821 Self::with_limit(DEFAULT_INCREMENTAL_CANCELLATION_LIMIT)
822 }
823}
824
825impl IncrementalCancellationRegistryV0 {
826 pub fn with_limit(limit: usize) -> Self {
827 Self {
828 limit: limit.max(1),
829 cancelled_request_ids: BTreeSet::new(),
830 }
831 }
832
833 pub fn cancel(&mut self, request_id: impl Into<String>) {
834 if self.cancelled_request_ids.len() >= self.limit {
835 self.cancelled_request_ids.clear();
836 }
837 self.cancelled_request_ids.insert(request_id.into());
838 }
839
840 pub fn take_cancelled(&mut self, request_id: &str) -> bool {
841 self.cancelled_request_ids.remove(request_id)
842 }
843
844 pub fn len(&self) -> usize {
845 self.cancelled_request_ids.len()
846 }
847
848 pub fn is_empty(&self) -> bool {
849 self.cancelled_request_ids.is_empty()
850 }
851
852 pub fn snapshot(&self) -> IncrementalCancellationSnapshotV0 {
853 IncrementalCancellationSnapshotV0 {
854 schema_version: "0",
855 product: "omena-incremental.cancellation-registry",
856 cancelled_request_count: self.cancelled_request_ids.len(),
857 cancelled_request_ids: self.cancelled_request_ids.iter().cloned().collect(),
858 }
859 }
860}
861
862#[cfg(test)]
863mod tests {
864 use super::{
865 IncrementalCancellationRegistryV0, IncrementalGraphInputV0, IncrementalNodeInputV0,
866 IncrementalRevisionV0, OmenaIncrementalDatabaseV0, SALSA_DEPENDENCY_QUERY_RUNS,
867 SALSA_DIGEST_QUERY_RUNS, plan_incremental_computation,
868 read_salsa_incremental_node_dependency_ids, read_salsa_incremental_node_digest,
869 snapshot_from_graph_input, summarize_datalog_rule_evaluator_v0,
870 summarize_incremental_layer_evidence_v0, summarize_omena_incremental_boundary,
871 };
872 use std::sync::atomic::Ordering;
873
874 #[test]
875 fn summarizes_incremental_boundary() {
876 let summary = summarize_omena_incremental_boundary();
877
878 assert_eq!(summary.product, "omena-incremental.boundary");
879 assert_eq!(
880 summary.query_model,
881 "salsaInput+trackedQueryFieldGranularReuse"
882 );
883 assert_eq!(
884 summary.dependency_propagation_policy,
885 "boundedFixedPointOverNormalizedNodeSet"
886 );
887 assert_eq!(
888 summary.maximum_dependency_propagation_iterations,
889 "nodeCount+1"
890 );
891 assert!(summary.dirty_reasons.contains(&"dependencyDirty"));
892 assert!(
893 summary
894 .ready_surfaces
895 .contains(&"incrementalCancellationRegistry")
896 );
897 assert!(summary.ready_surfaces.contains(&"datalogRuleEvaluatorV0"));
898 assert!(
899 summary
900 .ready_surfaces
901 .contains(&"salsaTrackedNodeSnapshotQuery")
902 );
903 assert!(
904 summary
905 .ready_surfaces
906 .contains(&"dependencyFixedPointEarlyExit")
907 );
908 }
909
910 #[test]
911 fn first_plan_marks_all_nodes_dirty() {
912 let input = sample_input("a:v1", "b:v1", 1);
913 let plan = plan_incremental_computation(&input, None);
914
915 assert_eq!(plan.product, "omena-incremental.computation-plan");
916 assert_eq!(plan.node_count, 2);
917 assert_eq!(plan.dirty_node_count, 2);
918 assert_eq!(plan.new_node_count, 2);
919 }
920
921 #[test]
922 fn unchanged_second_plan_marks_nodes_clean() {
923 let input = sample_input("a:v1", "b:v1", 1);
924 let snapshot = snapshot_from_graph_input(&input);
925 let next_input = sample_input("a:v1", "b:v1", 2);
926 let plan = plan_incremental_computation(&next_input, Some(&snapshot));
927
928 assert_eq!(plan.dirty_node_count, 0);
929 assert_eq!(plan.changed_input_count, 0);
930 }
931
932 #[test]
933 fn changed_dependency_marks_dependent_dirty() {
934 let input = sample_input("a:v1", "b:v1", 1);
935 let snapshot = snapshot_from_graph_input(&input);
936 let next_input = sample_input("a:v2", "b:v1", 2);
937 let plan = plan_incremental_computation(&next_input, Some(&snapshot));
938
939 assert_eq!(plan.changed_input_count, 1);
940 assert_eq!(plan.dependency_dirty_count, 1);
941 assert_eq!(node_reasons(&plan, "a"), vec!["inputDigestChanged"]);
942 assert_eq!(node_reasons(&plan, "b"), vec!["dependencyDirty"]);
943 }
944
945 #[test]
946 fn datalog_rule_evaluator_contract_matches_incremental_dirty_plan() {
947 let input = sample_input("a:v1", "b:v1", 1);
948 let snapshot = snapshot_from_graph_input(&input);
949 let next_input = sample_input("a:v2", "b:v1", 2);
950 let summary = summarize_datalog_rule_evaluator_v0(&next_input, Some(&snapshot));
951
952 assert_eq!(summary.schema_version, "0");
953 assert_eq!(summary.product, "omena-incremental.datalog-rule-evaluator");
954 assert_eq!(
955 summary.evaluator_kind,
956 "typedContractOverIncrementalFixedPoint"
957 );
958 assert_eq!(
959 summary.substrate,
960 "omena-incremental.salsa-backed-computation-plan"
961 );
962 assert!(!summary.external_host_ready);
963 assert_eq!(summary.rule_count, summary.rules.len());
964 assert_eq!(summary.relation_count, summary.relations.len());
965 assert_eq!(summary.input_node_count, 2);
966 assert_eq!(summary.dirty_node_count, 2);
967 assert_eq!(summary.derived_node_count, 1);
968 assert_eq!(summary.iteration_limit, 3);
969 assert!(summary.fixed_point_reached);
970 assert_eq!(summary.incremental_plan.changed_input_count, 1);
971 assert_eq!(summary.incremental_plan.dependency_dirty_count, 1);
972 assert!(summary.rules.iter().any(|rule| {
973 rule.name == "dependencyDirtyFixedPoint"
974 && rule.body == vec!["dependsOn(Node,Dependency)", "dirty(Dependency)"]
975 }));
976 }
977
978 #[test]
979 fn datalog_rule_evaluator_fixture_corpus_matches_incremental_fixed_point() {
980 for seed in [1, 2, 3, 5, 8, 13, 21, 34] {
981 let previous_input = super::generated_incremental_fuzz_graph(seed, 8, None);
982 let previous_snapshot = snapshot_from_graph_input(&previous_input);
983 let next_input = super::generated_incremental_fuzz_graph(seed, 8, Some(3));
984 let plan = plan_incremental_computation(&next_input, Some(&previous_snapshot));
985 let summary =
986 summarize_datalog_rule_evaluator_v0(&next_input, Some(&previous_snapshot));
987
988 assert_eq!(summary.incremental_plan, plan);
989 assert_eq!(summary.dirty_node_count, plan.dirty_node_count);
990 assert_eq!(summary.derived_node_count, plan.dependency_dirty_count);
991 assert!(summary.fixed_point_reached);
992 assert!(!summary.external_host_ready);
993 assert_eq!(summary.rule_count, 4);
994 assert_eq!(summary.relation_count, 5);
995 }
996 }
997
998 #[test]
999 fn cyclic_dependency_graph_uses_bounded_dirty_propagation() {
1000 let input = cyclic_input("a:v1", "b:v1", 1);
1001 let snapshot = snapshot_from_graph_input(&input);
1002 let next_input = cyclic_input("a:v2", "b:v1", 2);
1003 let plan = plan_incremental_computation(&next_input, Some(&snapshot));
1004
1005 assert_eq!(plan.changed_input_count, 1);
1006 assert_eq!(plan.dirty_node_count, 2);
1007 assert_eq!(node_reasons(&plan, "a"), vec!["inputDigestChanged"]);
1008 assert_eq!(node_reasons(&plan, "b"), vec!["dependencyDirty"]);
1009 assert_eq!(
1010 super::dependency_propagation_iteration_limit(input.nodes.len()),
1011 input.nodes.len() + 1
1012 );
1013 }
1014
1015 #[test]
1016 fn fuzz_seed_corpus_preserves_incremental_dirty_set_invariants() {
1017 let report = super::run_incremental_fuzz_seed_corpus();
1018
1019 assert_eq!(report.product, "omena-incremental.fuzz-seed-corpus");
1020 assert_eq!(report.failed_count, 0);
1021 assert_eq!(report.passed_count, report.case_count);
1022 assert!(
1023 report
1024 .results
1025 .iter()
1026 .any(|result| result.changed_node_id.is_none())
1027 );
1028 assert!(
1029 report
1030 .results
1031 .iter()
1032 .any(|result| result.expected_dirty_node_count > 1)
1033 );
1034 }
1035
1036 #[test]
1037 fn m6_incremental_layer_evidence_is_limited_to_real_invalidation_layer() {
1038 let evidence = summarize_incremental_layer_evidence_v0();
1039
1040 assert_eq!(evidence.schema_version, "0");
1041 assert_eq!(evidence.product, "omena-incremental.layer-evidence");
1042 assert_eq!(evidence.claim_level, "m6IncrementalLayerEvidenceOnly");
1043 assert_eq!(
1044 evidence.invalidation_layer,
1045 "stableNodeIdDigestDependencyGraph"
1046 );
1047 assert!(evidence.real_invalidation_evidence_ready);
1048 assert!(evidence.fuzz_evidence_ready);
1049 assert!(evidence.salsa_reuse_evidence_ready);
1050 assert!(evidence.datalog_contract_evidence_ready);
1051 assert!(evidence.benchmark_surface_ready);
1052 assert!(!evidence.performance_benchmark_claim_ready);
1053 assert!(!evidence.external_datalog_host_ready);
1054 assert!(!evidence.dbsp_zset_claim_ready);
1055 assert!(!evidence.public_safety_claim_ready);
1056 assert_eq!(
1057 evidence.benchmark_gate,
1058 "rust/z5-performance-baseline-readiness"
1059 );
1060 assert_eq!(evidence.fuzz_report.failed_count, 0);
1061 assert_eq!(
1062 evidence.sample_update.incremental_plan.changed_input_count,
1063 1
1064 );
1065 assert_eq!(
1066 evidence
1067 .sample_update
1068 .incremental_plan
1069 .dependency_dirty_count,
1070 1
1071 );
1072 assert!(
1073 evidence
1074 .supported_claims
1075 .contains(&"dependency dirty-set fixed point")
1076 );
1077 assert!(evidence.deferred_claims.contains(&"DBSP runtime"));
1078 assert!(
1079 evidence
1080 .deferred_claims
1081 .contains(&"Z-set differential dataflow semantics")
1082 );
1083 }
1084
1085 #[test]
1086 fn salsa_database_reuses_digest_query_when_only_dependencies_change() {
1087 SALSA_DIGEST_QUERY_RUNS.store(0, Ordering::Relaxed);
1088 SALSA_DEPENDENCY_QUERY_RUNS.store(0, Ordering::Relaxed);
1089
1090 let mut db = OmenaIncrementalDatabaseV0::default();
1091 let input = IncrementalGraphInputV0 {
1092 revision: IncrementalRevisionV0 { value: 1 },
1093 nodes: vec![IncrementalNodeInputV0 {
1094 id: "a".to_string(),
1095 digest: "a:v1".to_string(),
1096 dependency_ids: Vec::new(),
1097 }],
1098 };
1099 let snapshot = db.upsert_graph_input(&input);
1100 assert_eq!(snapshot.product, "omena-incremental.salsa-snapshot");
1101
1102 let Some(node) = db.node_input("a") else {
1103 return;
1104 };
1105 assert_eq!(
1106 read_salsa_incremental_node_digest(db.salsa_database(), node),
1107 "a:v1"
1108 );
1109 assert_eq!(
1110 read_salsa_incremental_node_dependency_ids(db.salsa_database(), node),
1111 Vec::<String>::new()
1112 );
1113 assert_eq!(SALSA_DIGEST_QUERY_RUNS.load(Ordering::Relaxed), 1);
1114 assert_eq!(SALSA_DEPENDENCY_QUERY_RUNS.load(Ordering::Relaxed), 1);
1115
1116 let next_input = IncrementalGraphInputV0 {
1117 revision: IncrementalRevisionV0 { value: 2 },
1118 nodes: vec![IncrementalNodeInputV0 {
1119 id: "a".to_string(),
1120 digest: "a:v1".to_string(),
1121 dependency_ids: vec!["root".to_string()],
1122 }],
1123 };
1124 db.upsert_graph_input(&next_input);
1125
1126 let Some(node) = db.node_input("a") else {
1127 return;
1128 };
1129 assert_eq!(
1130 read_salsa_incremental_node_digest(db.salsa_database(), node),
1131 "a:v1"
1132 );
1133 assert_eq!(
1134 read_salsa_incremental_node_dependency_ids(db.salsa_database(), node),
1135 vec!["root".to_string()]
1136 );
1137 assert_eq!(SALSA_DIGEST_QUERY_RUNS.load(Ordering::Relaxed), 1);
1138 assert_eq!(SALSA_DEPENDENCY_QUERY_RUNS.load(Ordering::Relaxed), 2);
1139 }
1140
1141 #[test]
1142 fn salsa_database_update_owns_plan_and_snapshot_progression() {
1143 let mut db = OmenaIncrementalDatabaseV0::default();
1144 let input = sample_input("a:v1", "b:v1", 1);
1145 let first = db.plan_and_upsert_graph_input(&input);
1146
1147 assert_eq!(first.product, "omena-incremental.salsa-database-update");
1148 assert_eq!(first.incremental_plan.dirty_node_count, 2);
1149 assert_eq!(
1150 first.next_snapshot.product,
1151 "omena-incremental.salsa-snapshot"
1152 );
1153 assert!(db.current_snapshot().is_some());
1154
1155 let unchanged = db.plan_and_upsert_graph_input(&sample_input("a:v1", "b:v1", 2));
1156 assert_eq!(unchanged.incremental_plan.dirty_node_count, 0);
1157
1158 let changed = db.plan_and_upsert_graph_input(&sample_input("a:v2", "b:v1", 3));
1159 assert_eq!(changed.incremental_plan.changed_input_count, 1);
1160 assert_eq!(changed.incremental_plan.dependency_dirty_count, 1);
1161 assert_eq!(changed.datalog_rule_evaluator.revision.value, 3);
1162 assert_eq!(
1163 changed.datalog_rule_evaluator.incremental_plan,
1164 changed.incremental_plan
1165 );
1166 assert_eq!(changed.datalog_rule_evaluator.dirty_node_count, 2);
1167 assert_eq!(changed.datalog_rule_evaluator.derived_node_count, 1);
1168 assert!(changed.datalog_rule_evaluator.fixed_point_reached);
1169 assert!(!changed.datalog_rule_evaluator.external_host_ready);
1170 }
1171
1172 #[test]
1173 fn cancellation_registry_tracks_and_consumes_request_ids() {
1174 let mut registry = IncrementalCancellationRegistryV0::with_limit(4);
1175
1176 registry.cancel("s:hover-1");
1177
1178 assert_eq!(registry.len(), 1);
1179 assert!(registry.take_cancelled("s:hover-1"));
1180 assert!(!registry.take_cancelled("s:hover-1"));
1181 assert!(registry.is_empty());
1182 }
1183
1184 #[test]
1185 fn cancellation_registry_bounds_stale_cancelled_requests() {
1186 let mut registry = IncrementalCancellationRegistryV0::with_limit(2);
1187
1188 registry.cancel("n:1");
1189 registry.cancel("n:2");
1190 registry.cancel("n:3");
1191
1192 let snapshot = registry.snapshot();
1193 assert_eq!(snapshot.product, "omena-incremental.cancellation-registry");
1194 assert_eq!(snapshot.cancelled_request_ids, vec!["n:3"]);
1195 }
1196
1197 fn sample_input(a_digest: &str, b_digest: &str, revision: u64) -> IncrementalGraphInputV0 {
1198 IncrementalGraphInputV0 {
1199 revision: IncrementalRevisionV0 { value: revision },
1200 nodes: vec![
1201 IncrementalNodeInputV0 {
1202 id: "b".to_string(),
1203 digest: b_digest.to_string(),
1204 dependency_ids: vec!["a".to_string()],
1205 },
1206 IncrementalNodeInputV0 {
1207 id: "a".to_string(),
1208 digest: a_digest.to_string(),
1209 dependency_ids: Vec::new(),
1210 },
1211 ],
1212 }
1213 }
1214
1215 fn cyclic_input(a_digest: &str, b_digest: &str, revision: u64) -> IncrementalGraphInputV0 {
1216 IncrementalGraphInputV0 {
1217 revision: IncrementalRevisionV0 { value: revision },
1218 nodes: vec![
1219 IncrementalNodeInputV0 {
1220 id: "a".to_string(),
1221 digest: a_digest.to_string(),
1222 dependency_ids: vec!["b".to_string()],
1223 },
1224 IncrementalNodeInputV0 {
1225 id: "b".to_string(),
1226 digest: b_digest.to_string(),
1227 dependency_ids: vec!["a".to_string()],
1228 },
1229 ],
1230 }
1231 }
1232
1233 fn node_reasons(plan: &super::IncrementalComputationPlanV0, id: &str) -> Vec<&'static str> {
1234 plan.nodes
1235 .iter()
1236 .find(|node| node.id == id)
1237 .map(|node| node.reasons.clone())
1238 .unwrap_or_default()
1239 }
1240}