Skip to main content

marque_engine/
scheduler.rs

1// SPDX-FileCopyrightText: 2026 Knitli Inc.
2//
3// SPDX-License-Identifier: LicenseRef-MarqueLicense-1.0
4
5//! Page-rewrite scheduler.
6//!
7//! `schedule_rewrites` runs Kahn's algorithm over the declared
8//! `PageRewrite::reads` / `writes` axes to produce a deterministic
9//! topological order — a rewrite that writes category `X` runs before
10//! any rewrite that reads `X`. Cycles and unannotated `Custom`
11//! rewrites abort construction with
12//! [`EngineConstructionError`](crate::errors::EngineConstructionError).
13//!
14//! This runs once at [`Engine::new`] and its output is cached on the
15//! engine instance; per-document rewrite evaluation walks the
16//! pre-computed order without re-sorting.
17
18use marque_scheme::{
19    CategoryAction, CategoryId, CategoryPredicate, MarkingScheme, PageRewrite, RewriteId,
20};
21use std::collections::{BTreeMap, BTreeSet};
22
23use crate::errors::EngineConstructionError;
24
25/// Compute the topological order of `rewrites` by their `reads` /
26/// `writes` axes.
27///
28/// Returns the ordered list of `RewriteId`s. Rewrites that have no
29/// predecessor (neither read nor write a category another rewrite
30/// writes) retain their declaration order relative to each other
31/// (FR-007: declaration-order independence for *cycle-free*
32/// inputs — but for rewrites with no edge between them, the only
33/// stable answer is declaration order).
34///
35/// # Errors
36///
37/// - [`EngineConstructionError::UnannotatedCustomAxes`] if any
38///   rewrite with a `Custom` trigger or action has empty `reads` or
39///   `writes`. Declarative rewrites (`Contains` / `Empty` triggers
40///   with `Clear` / `Replace` / `Promote` actions) are permitted to
41///   have empty annotations — the scheduler treats them as "no
42///   dataflow dependency" rather than as an authoring bug.
43/// - [`EngineConstructionError::RewriteCycle`] if the axis graph
44///   contains a cycle. All members participating in the cycle are
45///   reported, not just the entry point.
46pub fn schedule_rewrites<S>(
47    rewrites: &[PageRewrite<S>],
48) -> Result<Box<[RewriteId]>, EngineConstructionError>
49where
50    S: MarkingScheme + ?Sized,
51{
52    // 1. Enforce custom-annotation invariant.
53    for rw in rewrites {
54        let has_custom = rewrite_is_custom(rw);
55        if has_custom && (rw.reads.is_empty() || rw.writes.is_empty()) {
56            return Err(EngineConstructionError::UnannotatedCustomAxes { rewrite: rw.id });
57        }
58    }
59
60    // 2. Build the dependency graph: `edge(a, b)` iff `a` writes a
61    //    category `b` reads. `a` must run before `b`.
62    //
63    //    Keep the per-rewrite adjacency list as an ordered set so the
64    //    traversal is deterministic across runs.
65    let n = rewrites.len();
66    let mut in_degree: Vec<usize> = vec![0; n];
67    let mut successors: Vec<BTreeSet<usize>> = vec![BTreeSet::new(); n];
68
69    // Map each category to the rewrites that write it. `BTreeMap` for
70    // stable iteration order.
71    let mut writers: BTreeMap<CategoryId, Vec<usize>> = BTreeMap::new();
72    for (idx, rw) in rewrites.iter().enumerate() {
73        for w in rw.writes {
74            writers.entry(*w).or_default().push(idx);
75        }
76    }
77
78    for (idx, rw) in rewrites.iter().enumerate() {
79        for read_cat in rw.reads {
80            let Some(producers) = writers.get(read_cat) else {
81                continue;
82            };
83            for &producer_idx in producers {
84                if producer_idx == idx {
85                    // A rewrite that reads and writes the same
86                    // category is a self-edge; don't count it.
87                    continue;
88                }
89                // Producer must run before consumer: producer_idx → idx.
90                if successors[producer_idx].insert(idx) {
91                    in_degree[idx] += 1;
92                }
93            }
94        }
95    }
96
97    // 3. Kahn's algorithm. Seed the frontier with indexes that have
98    //    in-degree 0, preserving declaration order.
99    let mut frontier: std::collections::VecDeque<usize> =
100        (0..n).filter(|i| in_degree[*i] == 0).collect();
101    let mut scheduled: Vec<RewriteId> = Vec::with_capacity(n);
102    while let Some(idx) = frontier.pop_front() {
103        scheduled.push(rewrites[idx].id);
104        // Iterate successors in declaration order (BTreeSet iterates in
105        // sorted order; indexes are declaration-ordered).
106        for &succ in &successors[idx] {
107            in_degree[succ] -= 1;
108            if in_degree[succ] == 0 {
109                frontier.push_back(succ);
110            }
111        }
112    }
113
114    if scheduled.len() != n {
115        // Cycle detected. Extract the actual cycle participants via
116        // Tarjan's SCC so downstream-blocked rewrites (nodes that
117        // have in-degree > 0 only because a cycle upstream never
118        // resolves) are excluded from the reported `members` list.
119        // Self-edges (`a` reads and writes category `X`) are
120        // explicitly NOT counted as cycles above; a size-1 SCC
121        // without a self-successor is a downstream-blocked node,
122        // not a cycle member.
123        //
124        // When multiple disjoint cycles exist, report **one** of
125        // them — the one containing the lowest-index rewrite in
126        // declaration order. Flattening all cycles into a single
127        // `members` list would produce an error whose `axis` names
128        // one cycle and whose member set mixes unrelated rewrites,
129        // which is worse than naming just one cycle authoritatively.
130        // The author fixes this cycle, re-runs, and the next cycle
131        // (if any) surfaces on the next attempt.
132        let sccs = tarjan_sccs(n, &successors);
133        let mut cycle_sccs: Vec<Vec<usize>> = sccs
134            .into_iter()
135            .filter(|scc| {
136                // Size-1 SCCs reach this code only through
137                // downstream blocking — the self-loop case (a
138                // rewrite reading and writing the same category)
139                // is skipped earlier when building `successors`, so
140                // a singleton SCC never contains a true self-edge.
141                scc.len() > 1
142            })
143            .collect();
144        debug_assert!(
145            !cycle_sccs.is_empty(),
146            "scheduled.len() != n but Tarjan found no non-trivial SCC; \
147             this indicates a logic error in `schedule_rewrites` or in \
148             `tarjan_sccs`, because Kahn's algorithm only leaves nodes \
149             unscheduled when the residual graph contains a cycle."
150        );
151        // Pick the SCC that contains the rewrite with the lowest
152        // declaration index — deterministic across runs.
153        let picked = cycle_sccs
154            .iter_mut()
155            .min_by_key(|scc| scc.iter().min().copied().unwrap_or(usize::MAX))
156            .expect("debug_assert above guards the empty-Vec case");
157        picked.sort_unstable();
158        let axis = cycle_axis(rewrites, picked);
159        let members: Box<[RewriteId]> = picked
160            .iter()
161            .map(|&i| rewrites[i].id)
162            .collect::<Vec<_>>()
163            .into_boxed_slice();
164        return Err(EngineConstructionError::RewriteCycle { axis, members });
165    }
166
167    Ok(scheduled.into_boxed_slice())
168}
169
170/// Tarjan's strongly-connected-components algorithm.
171///
172/// Returns one `Vec<usize>` per SCC. A graph edge `u → v` is encoded
173/// as `successors[u].contains(&v)`. The algorithm is deterministic:
174/// it walks `successors` in `BTreeSet` order (sorted) so SCCs with
175/// identical node sets are grouped identically across runs.
176///
177/// Size-1 SCCs without a self-edge are not cycles — they are nodes
178/// the caller may choose to filter out based on the graph's semantics
179/// (the scheduler above filters on `scc.len() > 1` because `successors`
180/// has already had self-edges stripped).
181///
182/// The implementation is iterative to avoid stack-overflow on
183/// pathological inputs — `CapcoScheme` has ≤10 rewrites today, but a
184/// future CUI / NATO scheme may declare more.
185fn tarjan_sccs(n: usize, successors: &[BTreeSet<usize>]) -> Vec<Vec<usize>> {
186    // Per-node state.
187    let mut index: Vec<Option<usize>> = vec![None; n];
188    let mut lowlink: Vec<usize> = vec![0; n];
189    let mut on_stack: Vec<bool> = vec![false; n];
190    let mut scc_stack: Vec<usize> = Vec::new();
191    let mut next_index: usize = 0;
192    let mut sccs: Vec<Vec<usize>> = Vec::new();
193
194    // DFS-frame stack: (node, iterator-position into that node's
195    // successor list). We iterate successors as a Vec snapshot so we
196    // can pause and resume at a given index without borrowing the
197    // source BTreeSet across frames.
198    struct Frame {
199        node: usize,
200        successors: Vec<usize>,
201        pos: usize,
202    }
203    let mut dfs: Vec<Frame> = Vec::new();
204
205    for start in 0..n {
206        if index[start].is_some() {
207            continue;
208        }
209
210        // Seed.
211        index[start] = Some(next_index);
212        lowlink[start] = next_index;
213        next_index += 1;
214        scc_stack.push(start);
215        on_stack[start] = true;
216        dfs.push(Frame {
217            node: start,
218            successors: successors[start].iter().copied().collect(),
219            pos: 0,
220        });
221
222        while let Some(frame) = dfs.last_mut() {
223            if frame.pos < frame.successors.len() {
224                let w = frame.successors[frame.pos];
225                frame.pos += 1;
226                if index[w].is_none() {
227                    // Descend into w.
228                    index[w] = Some(next_index);
229                    lowlink[w] = next_index;
230                    next_index += 1;
231                    scc_stack.push(w);
232                    on_stack[w] = true;
233                    dfs.push(Frame {
234                        node: w,
235                        successors: successors[w].iter().copied().collect(),
236                        pos: 0,
237                    });
238                } else if on_stack[w] {
239                    let v = frame.node;
240                    let w_idx = index[w].expect("index[w] was set when w was pushed");
241                    lowlink[v] = lowlink[v].min(w_idx);
242                }
243            } else {
244                // Frame exhausted — pop and emit SCC if v is the root.
245                let v = frame.node;
246                dfs.pop();
247                if let Some(parent) = dfs.last_mut() {
248                    lowlink[parent.node] = lowlink[parent.node].min(lowlink[v]);
249                }
250                let v_index = index[v].expect("index[v] was set at seed");
251                if lowlink[v] == v_index {
252                    let mut component = Vec::new();
253                    while let Some(w) = scc_stack.pop() {
254                        on_stack[w] = false;
255                        component.push(w);
256                        if w == v {
257                            break;
258                        }
259                    }
260                    sccs.push(component);
261                }
262            }
263        }
264    }
265
266    sccs
267}
268
269/// Does the rewrite's trigger or action contain a `Custom` variant?
270///
271/// `Custom` variants carry function pointers opaque to the scheduler,
272/// so their dataflow cannot be derived from the variant itself —
273/// callers must annotate `reads` / `writes` explicitly. Declarative
274/// variants can safely elide annotations because the category is
275/// carried in the variant payload.
276fn rewrite_is_custom<S: MarkingScheme + ?Sized>(rw: &PageRewrite<S>) -> bool {
277    // Match on explicit references so the predicate stays correct
278    // even if a future variant introduces non-`Copy` payloads that
279    // default-binding-mode ergonomics won't cover.
280    matches!(&rw.trigger, CategoryPredicate::Custom(_))
281        || matches!(&rw.action, CategoryAction::Custom(_))
282}
283
284/// Pick a category from the rewrite cycle to name in the error.
285///
286/// Any category that appears on both sides of the cycle is a valid
287/// answer. We pick the lowest category id that's both read and
288/// written by some member of the cycle for deterministic reporting.
289///
290/// The intersection is guaranteed non-empty when `indexes` names a
291/// real cycle — every edge `a → b` in the scheduler's graph exists
292/// because some `c ∈ a.writes ∩ b.reads`, and a cycle requires at
293/// least one such shared category. The `debug_assert!` below catches
294/// a broken invariant loudly in tests and debug builds; the release
295/// fallback returns `CategoryId(0)` only as a last-resort defense
296/// against a future refactor that calls this helper with a
297/// non-cyclic index set.
298fn cycle_axis<S: MarkingScheme + ?Sized>(
299    rewrites: &[PageRewrite<S>],
300    indexes: &[usize],
301) -> CategoryId {
302    let mut reads: BTreeSet<CategoryId> = BTreeSet::new();
303    let mut writes: BTreeSet<CategoryId> = BTreeSet::new();
304    for &i in indexes {
305        for r in rewrites[i].reads {
306            reads.insert(*r);
307        }
308        for w in rewrites[i].writes {
309            writes.insert(*w);
310        }
311    }
312    let picked = reads.intersection(&writes).next().copied();
313    debug_assert!(
314        picked.is_some(),
315        "cycle_axis called with no shared read/write axis; this should \
316         be unreachable when `indexes` names a real cycle in the scheduler \
317         graph. The release-mode fallback is CategoryId(0), but reaching \
318         it means `schedule_rewrites` classified a non-cycle as a cycle.",
319    );
320    picked.unwrap_or(CategoryId(0))
321}
322
323#[cfg(test)]
324#[cfg_attr(coverage_nightly, coverage(off))]
325mod tests {
326    use super::*;
327    use marque_scheme::{
328        Category, Constraint, ConstraintViolation, Lattice, Parsed, Scope, Template, TokenId,
329        TokenRef,
330    };
331
332    // Minimal scheme used to exercise the scheduler without pulling in
333    // marque-capco (unit tests within `marque-engine` should not force
334    // a dependency on a specific rule crate). Because schedule_rewrites
335    // only touches `reads` / `writes` / `id` / the trigger+action
336    // variant shape, none of the other trait methods need real
337    // behavior here.
338    #[derive(Clone, Debug, PartialEq, Eq, Default)]
339    struct StubMarking;
340
341    impl Lattice for StubMarking {
342        fn join(&self, _other: &Self) -> Self {
343            Self
344        }
345        fn meet(&self, _other: &Self) -> Self {
346            Self
347        }
348    }
349
350    struct StubScheme;
351
352    impl MarkingScheme for StubScheme {
353        type Token = TokenId;
354        type Marking = StubMarking;
355        type ParseError = ();
356        fn name(&self) -> &str {
357            "stub"
358        }
359        fn schema_version(&self) -> &str {
360            "v0"
361        }
362        fn categories(&self) -> &[Category] {
363            &[]
364        }
365        fn constraints(&self) -> &[Constraint] {
366            &[]
367        }
368        fn templates(&self) -> &[Template] {
369            &[]
370        }
371        fn parse(&self, _: &str) -> Result<Parsed<Self::Marking>, Self::ParseError> {
372            Err(())
373        }
374        fn satisfies(&self, _: &Self::Marking, _: &TokenRef) -> bool {
375            false
376        }
377        fn validate(&self, _: &Self::Marking) -> Vec<ConstraintViolation> {
378            vec![]
379        }
380        fn project(&self, _: Scope, _: &[Self::Marking]) -> Self::Marking {
381            StubMarking
382        }
383        fn render_portion(&self, _: &Self::Marking) -> String {
384            String::new()
385        }
386        fn render_banner(&self, _: &Self::Marking) -> String {
387            String::new()
388        }
389    }
390
391    const CAT_X: CategoryId = CategoryId(1);
392    const CAT_Y: CategoryId = CategoryId(2);
393    const CAT_Z: CategoryId = CategoryId(3);
394
395    fn declarative(
396        id: RewriteId,
397        reads: &'static [CategoryId],
398        writes: &'static [CategoryId],
399    ) -> PageRewrite<StubScheme> {
400        PageRewrite::declarative(
401            id,
402            "test",
403            CategoryPredicate::Empty { category: CAT_X },
404            CategoryAction::Clear { category: CAT_X },
405            reads,
406            writes,
407        )
408    }
409
410    #[test]
411    fn empty_input_is_empty_output() {
412        let scheduled = schedule_rewrites::<StubScheme>(&[]).unwrap();
413        assert!(scheduled.is_empty());
414    }
415
416    #[test]
417    fn no_dependencies_preserves_declaration_order() {
418        let rewrites = vec![
419            declarative("a", &[], &[CAT_X]),
420            declarative("b", &[], &[CAT_Y]),
421            declarative("c", &[], &[CAT_Z]),
422        ];
423        let scheduled = schedule_rewrites(&rewrites).unwrap();
424        assert_eq!(scheduled.as_ref(), ["a", "b", "c"]);
425    }
426
427    #[test]
428    fn writer_before_reader() {
429        // b reads X, a writes X ⇒ a must precede b in the schedule
430        // regardless of declaration order.
431        let rewrites = vec![
432            declarative("b", &[CAT_X], &[CAT_Y]),
433            declarative("a", &[], &[CAT_X]),
434        ];
435        let scheduled = schedule_rewrites(&rewrites).unwrap();
436        assert_eq!(scheduled.as_ref(), ["a", "b"]);
437    }
438
439    #[test]
440    fn self_edge_is_permitted() {
441        // A rewrite that reads and writes the same category has no
442        // in-edge from itself — it's a no-op dependency.
443        let rewrites = vec![declarative("a", &[CAT_X], &[CAT_X])];
444        let scheduled = schedule_rewrites(&rewrites).unwrap();
445        assert_eq!(scheduled.as_ref(), ["a"]);
446    }
447}