marque_engine/scheduler.rs
1// SPDX-FileCopyrightText: 2026 Knitli Inc.
2//
3// SPDX-License-Identifier: LicenseRef-MarqueLicense-1.0
4
5//! Page-rewrite scheduler.
6//!
7//! `schedule_rewrites` runs Kahn's algorithm over the declared
8//! `PageRewrite::reads` / `writes` axes to produce a deterministic
9//! topological order — a rewrite that writes category `X` runs before
10//! any rewrite that reads `X`. Cycles and unannotated `Custom`
11//! rewrites abort construction with
12//! [`EngineConstructionError`](crate::errors::EngineConstructionError).
13//!
14//! This runs once at [`Engine::new`] and its output is cached on the
15//! engine instance; per-document rewrite evaluation walks the
16//! pre-computed order without re-sorting.
17
18use marque_scheme::{
19 CategoryAction, CategoryId, CategoryPredicate, MarkingScheme, PageRewrite, RewriteId,
20};
21use std::collections::{BTreeMap, BTreeSet};
22
23use crate::errors::EngineConstructionError;
24
25/// Compute the topological order of `rewrites` by their `reads` /
26/// `writes` axes.
27///
28/// Returns the ordered list of `RewriteId`s. Rewrites that have no
29/// predecessor (neither read nor write a category another rewrite
30/// writes) retain their declaration order relative to each other
31/// (FR-007: declaration-order independence for *cycle-free*
32/// inputs — but for rewrites with no edge between them, the only
33/// stable answer is declaration order).
34///
35/// # Errors
36///
37/// - [`EngineConstructionError::UnannotatedCustomAxes`] if any
38/// rewrite with a `Custom` trigger or action has empty `reads` or
39/// `writes`. Declarative rewrites (`Contains` / `Empty` triggers
40/// with `Clear` / `Replace` / `Promote` actions) are permitted to
41/// have empty annotations — the scheduler treats them as "no
42/// dataflow dependency" rather than as an authoring bug.
43/// - [`EngineConstructionError::RewriteCycle`] if the axis graph
44/// contains a cycle. All members participating in the cycle are
45/// reported, not just the entry point.
46pub fn schedule_rewrites<S>(
47 rewrites: &[PageRewrite<S>],
48) -> Result<Box<[RewriteId]>, EngineConstructionError>
49where
50 S: MarkingScheme + ?Sized,
51{
52 // 1. Enforce custom-annotation invariant.
53 for rw in rewrites {
54 let has_custom = rewrite_is_custom(rw);
55 if has_custom && (rw.reads.is_empty() || rw.writes.is_empty()) {
56 return Err(EngineConstructionError::UnannotatedCustomAxes { rewrite: rw.id });
57 }
58 }
59
60 // 2. Build the dependency graph: `edge(a, b)` iff `a` writes a
61 // category `b` reads. `a` must run before `b`.
62 //
63 // Keep the per-rewrite adjacency list as an ordered set so the
64 // traversal is deterministic across runs.
65 let n = rewrites.len();
66 let mut in_degree: Vec<usize> = vec![0; n];
67 let mut successors: Vec<BTreeSet<usize>> = vec![BTreeSet::new(); n];
68
69 // Map each category to the rewrites that write it. `BTreeMap` for
70 // stable iteration order.
71 let mut writers: BTreeMap<CategoryId, Vec<usize>> = BTreeMap::new();
72 for (idx, rw) in rewrites.iter().enumerate() {
73 for w in rw.writes {
74 writers.entry(*w).or_default().push(idx);
75 }
76 }
77
78 for (idx, rw) in rewrites.iter().enumerate() {
79 for read_cat in rw.reads {
80 let Some(producers) = writers.get(read_cat) else {
81 continue;
82 };
83 for &producer_idx in producers {
84 if producer_idx == idx {
85 // A rewrite that reads and writes the same
86 // category is a self-edge; don't count it.
87 continue;
88 }
89 // Producer must run before consumer: producer_idx → idx.
90 if successors[producer_idx].insert(idx) {
91 in_degree[idx] += 1;
92 }
93 }
94 }
95 }
96
97 // 3. Kahn's algorithm. Seed the frontier with indexes that have
98 // in-degree 0, preserving declaration order.
99 let mut frontier: std::collections::VecDeque<usize> =
100 (0..n).filter(|i| in_degree[*i] == 0).collect();
101 let mut scheduled: Vec<RewriteId> = Vec::with_capacity(n);
102 while let Some(idx) = frontier.pop_front() {
103 scheduled.push(rewrites[idx].id);
104 // Iterate successors in declaration order (BTreeSet iterates in
105 // sorted order; indexes are declaration-ordered).
106 for &succ in &successors[idx] {
107 in_degree[succ] -= 1;
108 if in_degree[succ] == 0 {
109 frontier.push_back(succ);
110 }
111 }
112 }
113
114 if scheduled.len() != n {
115 // Cycle detected. Extract the actual cycle participants via
116 // Tarjan's SCC so downstream-blocked rewrites (nodes that
117 // have in-degree > 0 only because a cycle upstream never
118 // resolves) are excluded from the reported `members` list.
119 // Self-edges (`a` reads and writes category `X`) are
120 // explicitly NOT counted as cycles above; a size-1 SCC
121 // without a self-successor is a downstream-blocked node,
122 // not a cycle member.
123 //
124 // When multiple disjoint cycles exist, report **one** of
125 // them — the one containing the lowest-index rewrite in
126 // declaration order. Flattening all cycles into a single
127 // `members` list would produce an error whose `axis` names
128 // one cycle and whose member set mixes unrelated rewrites,
129 // which is worse than naming just one cycle authoritatively.
130 // The author fixes this cycle, re-runs, and the next cycle
131 // (if any) surfaces on the next attempt.
132 let sccs = tarjan_sccs(n, &successors);
133 let mut cycle_sccs: Vec<Vec<usize>> = sccs
134 .into_iter()
135 .filter(|scc| {
136 // Size-1 SCCs reach this code only through
137 // downstream blocking — the self-loop case (a
138 // rewrite reading and writing the same category)
139 // is skipped earlier when building `successors`, so
140 // a singleton SCC never contains a true self-edge.
141 scc.len() > 1
142 })
143 .collect();
144 debug_assert!(
145 !cycle_sccs.is_empty(),
146 "scheduled.len() != n but Tarjan found no non-trivial SCC; \
147 this indicates a logic error in `schedule_rewrites` or in \
148 `tarjan_sccs`, because Kahn's algorithm only leaves nodes \
149 unscheduled when the residual graph contains a cycle."
150 );
151 // Pick the SCC that contains the rewrite with the lowest
152 // declaration index — deterministic across runs.
153 let picked = cycle_sccs
154 .iter_mut()
155 .min_by_key(|scc| scc.iter().min().copied().unwrap_or(usize::MAX))
156 .expect("debug_assert above guards the empty-Vec case");
157 picked.sort_unstable();
158 let axis = cycle_axis(rewrites, picked);
159 let members: Box<[RewriteId]> = picked
160 .iter()
161 .map(|&i| rewrites[i].id)
162 .collect::<Vec<_>>()
163 .into_boxed_slice();
164 return Err(EngineConstructionError::RewriteCycle { axis, members });
165 }
166
167 Ok(scheduled.into_boxed_slice())
168}
169
170/// Tarjan's strongly-connected-components algorithm.
171///
172/// Returns one `Vec<usize>` per SCC. A graph edge `u → v` is encoded
173/// as `successors[u].contains(&v)`. The algorithm is deterministic:
174/// it walks `successors` in `BTreeSet` order (sorted) so SCCs with
175/// identical node sets are grouped identically across runs.
176///
177/// Size-1 SCCs without a self-edge are not cycles — they are nodes
178/// the caller may choose to filter out based on the graph's semantics
179/// (the scheduler above filters on `scc.len() > 1` because `successors`
180/// has already had self-edges stripped).
181///
182/// The implementation is iterative to avoid stack-overflow on
183/// pathological inputs — `CapcoScheme` has ≤10 rewrites today, but a
184/// future CUI / NATO scheme may declare more.
185fn tarjan_sccs(n: usize, successors: &[BTreeSet<usize>]) -> Vec<Vec<usize>> {
186 // Per-node state.
187 let mut index: Vec<Option<usize>> = vec![None; n];
188 let mut lowlink: Vec<usize> = vec![0; n];
189 let mut on_stack: Vec<bool> = vec![false; n];
190 let mut scc_stack: Vec<usize> = Vec::new();
191 let mut next_index: usize = 0;
192 let mut sccs: Vec<Vec<usize>> = Vec::new();
193
194 // DFS-frame stack: (node, iterator-position into that node's
195 // successor list). We iterate successors as a Vec snapshot so we
196 // can pause and resume at a given index without borrowing the
197 // source BTreeSet across frames.
198 struct Frame {
199 node: usize,
200 successors: Vec<usize>,
201 pos: usize,
202 }
203 let mut dfs: Vec<Frame> = Vec::new();
204
205 for start in 0..n {
206 if index[start].is_some() {
207 continue;
208 }
209
210 // Seed.
211 index[start] = Some(next_index);
212 lowlink[start] = next_index;
213 next_index += 1;
214 scc_stack.push(start);
215 on_stack[start] = true;
216 dfs.push(Frame {
217 node: start,
218 successors: successors[start].iter().copied().collect(),
219 pos: 0,
220 });
221
222 while let Some(frame) = dfs.last_mut() {
223 if frame.pos < frame.successors.len() {
224 let w = frame.successors[frame.pos];
225 frame.pos += 1;
226 if index[w].is_none() {
227 // Descend into w.
228 index[w] = Some(next_index);
229 lowlink[w] = next_index;
230 next_index += 1;
231 scc_stack.push(w);
232 on_stack[w] = true;
233 dfs.push(Frame {
234 node: w,
235 successors: successors[w].iter().copied().collect(),
236 pos: 0,
237 });
238 } else if on_stack[w] {
239 let v = frame.node;
240 let w_idx = index[w].expect("index[w] was set when w was pushed");
241 lowlink[v] = lowlink[v].min(w_idx);
242 }
243 } else {
244 // Frame exhausted — pop and emit SCC if v is the root.
245 let v = frame.node;
246 dfs.pop();
247 if let Some(parent) = dfs.last_mut() {
248 lowlink[parent.node] = lowlink[parent.node].min(lowlink[v]);
249 }
250 let v_index = index[v].expect("index[v] was set at seed");
251 if lowlink[v] == v_index {
252 let mut component = Vec::new();
253 while let Some(w) = scc_stack.pop() {
254 on_stack[w] = false;
255 component.push(w);
256 if w == v {
257 break;
258 }
259 }
260 sccs.push(component);
261 }
262 }
263 }
264 }
265
266 sccs
267}
268
269/// Does the rewrite's trigger or action contain a `Custom` variant?
270///
271/// `Custom` variants carry function pointers opaque to the scheduler,
272/// so their dataflow cannot be derived from the variant itself —
273/// callers must annotate `reads` / `writes` explicitly. Declarative
274/// variants can safely elide annotations because the category is
275/// carried in the variant payload.
276fn rewrite_is_custom<S: MarkingScheme + ?Sized>(rw: &PageRewrite<S>) -> bool {
277 // Match on explicit references so the predicate stays correct
278 // even if a future variant introduces non-`Copy` payloads that
279 // default-binding-mode ergonomics won't cover.
280 matches!(&rw.trigger, CategoryPredicate::Custom(_))
281 || matches!(&rw.action, CategoryAction::Custom(_))
282}
283
284/// Pick a category from the rewrite cycle to name in the error.
285///
286/// Any category that appears on both sides of the cycle is a valid
287/// answer. We pick the lowest category id that's both read and
288/// written by some member of the cycle for deterministic reporting.
289///
290/// The intersection is guaranteed non-empty when `indexes` names a
291/// real cycle — every edge `a → b` in the scheduler's graph exists
292/// because some `c ∈ a.writes ∩ b.reads`, and a cycle requires at
293/// least one such shared category. The `debug_assert!` below catches
294/// a broken invariant loudly in tests and debug builds; the release
295/// fallback returns `CategoryId(0)` only as a last-resort defense
296/// against a future refactor that calls this helper with a
297/// non-cyclic index set.
298fn cycle_axis<S: MarkingScheme + ?Sized>(
299 rewrites: &[PageRewrite<S>],
300 indexes: &[usize],
301) -> CategoryId {
302 let mut reads: BTreeSet<CategoryId> = BTreeSet::new();
303 let mut writes: BTreeSet<CategoryId> = BTreeSet::new();
304 for &i in indexes {
305 for r in rewrites[i].reads {
306 reads.insert(*r);
307 }
308 for w in rewrites[i].writes {
309 writes.insert(*w);
310 }
311 }
312 let picked = reads.intersection(&writes).next().copied();
313 debug_assert!(
314 picked.is_some(),
315 "cycle_axis called with no shared read/write axis; this should \
316 be unreachable when `indexes` names a real cycle in the scheduler \
317 graph. The release-mode fallback is CategoryId(0), but reaching \
318 it means `schedule_rewrites` classified a non-cycle as a cycle.",
319 );
320 picked.unwrap_or(CategoryId(0))
321}
322
323#[cfg(test)]
324#[cfg_attr(coverage_nightly, coverage(off))]
325mod tests {
326 use super::*;
327 use marque_scheme::{
328 Category, Constraint, ConstraintViolation, Lattice, Parsed, Scope, Template, TokenId,
329 TokenRef,
330 };
331
332 // Minimal scheme used to exercise the scheduler without pulling in
333 // marque-capco (unit tests within `marque-engine` should not force
334 // a dependency on a specific rule crate). Because schedule_rewrites
335 // only touches `reads` / `writes` / `id` / the trigger+action
336 // variant shape, none of the other trait methods need real
337 // behavior here.
338 #[derive(Clone, Debug, PartialEq, Eq, Default)]
339 struct StubMarking;
340
341 impl Lattice for StubMarking {
342 fn join(&self, _other: &Self) -> Self {
343 Self
344 }
345 fn meet(&self, _other: &Self) -> Self {
346 Self
347 }
348 }
349
350 struct StubScheme;
351
352 impl MarkingScheme for StubScheme {
353 type Token = TokenId;
354 type Marking = StubMarking;
355 type ParseError = ();
356 fn name(&self) -> &str {
357 "stub"
358 }
359 fn schema_version(&self) -> &str {
360 "v0"
361 }
362 fn categories(&self) -> &[Category] {
363 &[]
364 }
365 fn constraints(&self) -> &[Constraint] {
366 &[]
367 }
368 fn templates(&self) -> &[Template] {
369 &[]
370 }
371 fn parse(&self, _: &str) -> Result<Parsed<Self::Marking>, Self::ParseError> {
372 Err(())
373 }
374 fn satisfies(&self, _: &Self::Marking, _: &TokenRef) -> bool {
375 false
376 }
377 fn validate(&self, _: &Self::Marking) -> Vec<ConstraintViolation> {
378 vec![]
379 }
380 fn project(&self, _: Scope, _: &[Self::Marking]) -> Self::Marking {
381 StubMarking
382 }
383 fn render_portion(&self, _: &Self::Marking) -> String {
384 String::new()
385 }
386 fn render_banner(&self, _: &Self::Marking) -> String {
387 String::new()
388 }
389 }
390
391 const CAT_X: CategoryId = CategoryId(1);
392 const CAT_Y: CategoryId = CategoryId(2);
393 const CAT_Z: CategoryId = CategoryId(3);
394
395 fn declarative(
396 id: RewriteId,
397 reads: &'static [CategoryId],
398 writes: &'static [CategoryId],
399 ) -> PageRewrite<StubScheme> {
400 PageRewrite::declarative(
401 id,
402 "test",
403 CategoryPredicate::Empty { category: CAT_X },
404 CategoryAction::Clear { category: CAT_X },
405 reads,
406 writes,
407 )
408 }
409
410 #[test]
411 fn empty_input_is_empty_output() {
412 let scheduled = schedule_rewrites::<StubScheme>(&[]).unwrap();
413 assert!(scheduled.is_empty());
414 }
415
416 #[test]
417 fn no_dependencies_preserves_declaration_order() {
418 let rewrites = vec![
419 declarative("a", &[], &[CAT_X]),
420 declarative("b", &[], &[CAT_Y]),
421 declarative("c", &[], &[CAT_Z]),
422 ];
423 let scheduled = schedule_rewrites(&rewrites).unwrap();
424 assert_eq!(scheduled.as_ref(), ["a", "b", "c"]);
425 }
426
427 #[test]
428 fn writer_before_reader() {
429 // b reads X, a writes X ⇒ a must precede b in the schedule
430 // regardless of declaration order.
431 let rewrites = vec![
432 declarative("b", &[CAT_X], &[CAT_Y]),
433 declarative("a", &[], &[CAT_X]),
434 ];
435 let scheduled = schedule_rewrites(&rewrites).unwrap();
436 assert_eq!(scheduled.as_ref(), ["a", "b"]);
437 }
438
439 #[test]
440 fn self_edge_is_permitted() {
441 // A rewrite that reads and writes the same category has no
442 // in-edge from itself — it's a no-op dependency.
443 let rewrites = vec![declarative("a", &[CAT_X], &[CAT_X])];
444 let scheduled = schedule_rewrites(&rewrites).unwrap();
445 assert_eq!(scheduled.as_ref(), ["a"]);
446 }
447}