Skip to main content

formualizer_eval/engine/
scheduler.rs

1use super::DependencyGraph;
2use super::vertex::VertexId;
3use formualizer_common::ExcelError;
4use rustc_hash::{FxHashMap, FxHashSet};
5
6pub struct Scheduler<'a> {
7    graph: &'a DependencyGraph,
8}
9
10#[derive(Debug, Clone)]
11pub struct Layer {
12    pub vertices: Vec<VertexId>,
13}
14
15/// One step of the canonical schedule walk: either an acyclic Kahn wave
16/// (`Layer`, an index into `Schedule::layers`) or a cyclic SCC treated as a
17/// single super-node (`Cycle`, an index into `Schedule::cycles`). Storing
18/// indices keeps `Schedule::layers`/`Schedule::cycles` the single owners of
19/// the vertex Vecs, so building or cloning a schedule never duplicates them.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum ScheduleUnit {
22    Layer(u32),
23    Cycle(u32),
24}
25
26#[derive(Debug, Clone)]
27pub struct Schedule {
28    /// Canonical walk order: condensation order over the dependency graph.
29    /// Every `Cycle` unit appears after all units containing its external
30    /// dependencies and before all units containing its external dependents.
31    pub units: Vec<ScheduleUnit>,
32    /// All cyclic SCCs (same contents as before `units` existed), for
33    /// consumers that only need the set of cycles.
34    pub cycles: Vec<Vec<VertexId>>,
35    /// The `Layer` units in order, for consumers that only walk layers.
36    pub layers: Vec<Layer>,
37}
38
39impl Schedule {
40    fn from_parts(layers: Vec<Layer>, cycles: Vec<Vec<VertexId>>) -> Self {
41        debug_assert!(
42            cycles.is_empty(),
43            "Schedule::from_parts is the cycle-free fast path"
44        );
45        let units = (0..layers.len() as u32).map(ScheduleUnit::Layer).collect();
46        Schedule {
47            units,
48            cycles,
49            layers,
50        }
51    }
52
53    /// Resolve a `ScheduleUnit::Layer` index.
54    pub fn unit_layer(&self, i: u32) -> &Layer {
55        &self.layers[i as usize]
56    }
57
58    /// Resolve a `ScheduleUnit::Cycle` index.
59    pub fn unit_cycle(&self, i: u32) -> &[VertexId] {
60        &self.cycles[i as usize]
61    }
62}
63
64impl<'a> Scheduler<'a> {
65    pub fn new(graph: &'a DependencyGraph) -> Self {
66        Self { graph }
67    }
68
69    pub fn create_schedule(&self, vertices: &[VertexId]) -> Result<Schedule, ExcelError> {
70        #[cfg(feature = "tracing")]
71        let _span = tracing::info_span!("scheduler", vertices = vertices.len()).entered();
72        // 1. Find strongly connected components using Tarjan's algorithm
73        #[cfg(feature = "tracing")]
74        let _scc_span = tracing::info_span!("tarjan_scc").entered();
75        let sccs = self.tarjan_scc(vertices)?;
76        #[cfg(feature = "tracing")]
77        drop(_scc_span);
78
79        // 2. Separate cyclic from acyclic components
80        let (cycles, acyclic_sccs) = self.separate_cycles(sccs);
81
82        // 3. Topologically sort acyclic components into layers
83        if self.graph.dynamic_topo_enabled() {
84            // Dynamic-topo (PK) branch: pk_layers_for orders only the acyclic
85            // subset, so we cannot interleave Cycle units by condensation
86            // position here. Stay conservative on this experimental path and
87            // emit ALL Cycle units first (matching today's stamp-cycles-first
88            // semantics), then the pk layers as Layer units.
89            let subset: Vec<VertexId> = acyclic_sccs.into_iter().flatten().collect();
90            let layers = if subset.is_empty() {
91                Vec::new()
92            } else {
93                self.graph
94                    .pk_layers_for(&subset)
95                    .unwrap_or(self.build_layers(vec![subset])?)
96            };
97            let mut units = Vec::with_capacity(cycles.len() + layers.len());
98            let mut cycle_order: Vec<usize> = (0..cycles.len()).collect();
99            cycle_order.sort_by_key(|&i| cycles[i].iter().copied().min());
100            for i in cycle_order {
101                units.push(ScheduleUnit::Cycle(i as u32));
102            }
103            units.extend((0..layers.len() as u32).map(ScheduleUnit::Layer));
104            return Ok(Schedule {
105                units,
106                cycles,
107                layers,
108            });
109        }
110
111        if cycles.is_empty() {
112            // Fast path: byte-for-byte today's layer construction.
113            let layers = self.build_layers(acyclic_sccs)?;
114            return Ok(Schedule::from_parts(layers, cycles));
115        }
116
117        // Cycle path: Kahn over the condensation (SCC-as-node).
118        let (units, layers) = self.build_condensation_units(&cycles, acyclic_sccs, None)?;
119        Ok(Schedule {
120            units,
121            cycles,
122            layers,
123        })
124    }
125
126    /// Create a schedule considering additional ephemeral (virtual) dependencies just for this pass.
127    /// `vdeps` maps a vertex to extra dependency vertices that should be considered as incoming edges.
128    pub fn create_schedule_with_virtual(
129        &self,
130        vertices: &[VertexId],
131        vdeps: &FxHashMap<VertexId, Vec<VertexId>>,
132    ) -> Result<Schedule, ExcelError> {
133        #[cfg(feature = "tracing")]
134        let _span = tracing::info_span!(
135            "scheduler_with_virtual",
136            vertices = vertices.len(),
137            vdeps = vdeps.len()
138        )
139        .entered();
140        // 1. SCC detection with virtual deps
141        #[cfg(feature = "tracing")]
142        let _scc_span = tracing::info_span!("tarjan_scc_with_virtual").entered();
143        let sccs = self.tarjan_scc_with_virtual(vertices, vdeps)?;
144        #[cfg(feature = "tracing")]
145        drop(_scc_span);
146        // 2. Separate cycles and acyclic components
147        let (cycles, acyclic_sccs) = self.separate_cycles(sccs);
148        // 3. Build layers over combined adjacency (graph + vdeps)
149        #[cfg(feature = "tracing")]
150        let _layers_span = tracing::info_span!("build_layers_with_virtual").entered();
151        if cycles.is_empty() {
152            // Fast path: byte-for-byte today's layer construction.
153            let layers = self.build_layers_with_virtual(acyclic_sccs, vdeps)?;
154            return Ok(Schedule::from_parts(layers, cycles));
155        }
156        // Cycle path: Kahn over the condensation (SCC-as-node), honoring
157        // virtual deps as extra edges.
158        let (units, layers) = self.build_condensation_units(&cycles, acyclic_sccs, Some(vdeps))?;
159        Ok(Schedule {
160            units,
161            cycles,
162            layers,
163        })
164    }
165
166    /// Tarjan's strongly connected components algorithm
167    pub fn tarjan_scc(&self, vertices: &[VertexId]) -> Result<Vec<Vec<VertexId>>, ExcelError> {
168        self.tarjan_scc_impl(vertices, None)
169    }
170
171    /// Tarjan with virtual deps
172    fn tarjan_scc_with_virtual(
173        &self,
174        vertices: &[VertexId],
175        vdeps: &FxHashMap<VertexId, Vec<VertexId>>,
176    ) -> Result<Vec<Vec<VertexId>>, ExcelError> {
177        self.tarjan_scc_impl(vertices, Some(vdeps))
178    }
179
180    /// Iterative Tarjan over the scheduled subgraph, optionally honoring
181    /// per-pass virtual dependency edges.
182    ///
183    /// The DFS uses an explicit frame stack (vertex, dependency list, next
184    /// edge offset) instead of recursion, so depth is bounded by heap, not
185    /// the thread stack — the recursive predecessor SIGABRTed around depth
186    /// ~1500 in debug (2 MiB test stacks) on large SCCs AND on plain acyclic
187    /// chains whose dependencies point at higher vertex ids (see
188    /// `engine/tests/iterate_corpus_scale.rs`).
189    ///
190    /// Determinism contract: SCC emission order and within-SCC member order
191    /// are byte-identical to the recursive version (pinned by the
192    /// differential test in `engine/tests/tarjan_differential.rs` and the
193    /// ordering invariants in `engine/tests/schedule_units.rs`):
194    /// * dependencies are walked in adjacency order (base slice/Vec, then
195    ///   the vertex's virtual deps, exactly as the recursive code iterated);
196    /// * a vertex's SCC is emitted when its frame is exhausted and
197    ///   `lowlink == index` (the same program point as the recursive
198    ///   post-order emission);
199    /// * SCC members are popped off the Tarjan stack in the same order.
200    fn tarjan_scc_impl(
201        &self,
202        vertices: &[VertexId],
203        vdeps: Option<&FxHashMap<VertexId, Vec<VertexId>>>,
204    ) -> Result<Vec<Vec<VertexId>>, ExcelError> {
205        /// One vertex's dependency list, materialized once per DFS frame.
206        enum DepList<'g> {
207            Slice(&'g [VertexId]),
208            Owned(Vec<VertexId>),
209        }
210        impl DepList<'_> {
211            #[inline]
212            fn get(&self, i: usize) -> Option<VertexId> {
213                match self {
214                    DepList::Slice(s) => s.get(i).copied(),
215                    DepList::Owned(v) => v.get(i).copied(),
216                }
217            }
218        }
219
220        let deps_of = |vertex: VertexId| -> DepList<'_> {
221            // Edge order must match the recursive implementation: the base
222            // adjacency (zero-copy slice when available), with the vertex's
223            // virtual deps appended when present.
224            if let Some(extra) = vdeps.and_then(|m| m.get(&vertex)) {
225                let mut combined: Vec<VertexId> =
226                    if let Some(base) = self.graph.dependencies_slice(vertex) {
227                        base.to_vec()
228                    } else {
229                        self.graph.get_dependencies(vertex)
230                    };
231                combined.extend(extra.iter().copied());
232                DepList::Owned(combined)
233            } else if let Some(base) = self.graph.dependencies_slice(vertex) {
234                DepList::Slice(base)
235            } else {
236                DepList::Owned(self.graph.get_dependencies(vertex))
237            }
238        };
239
240        // Compact the candidate set into dense local ids once, up front. All
241        // per-edge / per-frame bookkeeping (index, lowlink, on-stack) then
242        // becomes a plain array op instead of an FxHashMap probe; the only
243        // remaining hash per edge is the `VertexId -> local id` membership
244        // lookup, which doubles as the old `vertex_set.contains` filter.
245        // `VertexId` is a `u32` newtype, so `u32` local ids cannot overflow.
246        const UNVISITED: u32 = u32::MAX;
247        let mut local_of: FxHashMap<VertexId, u32> =
248            FxHashMap::with_capacity_and_hasher(vertices.len(), Default::default());
249        let mut vertex_of_local: Vec<VertexId> = Vec::with_capacity(vertices.len());
250        for &v in vertices {
251            if let std::collections::hash_map::Entry::Vacant(slot) = local_of.entry(v) {
252                slot.insert(vertex_of_local.len() as u32);
253                vertex_of_local.push(v);
254            }
255        }
256        let n = vertex_of_local.len();
257
258        let mut index_counter: u32 = 0;
259        let mut indices: Vec<u32> = vec![UNVISITED; n];
260        let mut lowlinks: Vec<u32> = vec![0; n];
261        let mut on_stack: Vec<bool> = vec![false; n];
262        let mut stack: Vec<u32> = Vec::with_capacity(n);
263        let mut sccs: Vec<Vec<VertexId>> = Vec::new();
264
265        // Explicit DFS frames: (local id, its dependency list, next edge
266        // offset). Pre-sized to the worst case (one frame per vertex, e.g. a
267        // single deep chain) so deep schedules never re-grow the stack.
268        let mut frames: Vec<(u32, DepList<'_>, u32)> = Vec::with_capacity(n);
269
270        for &root_vertex in vertices {
271            let root = local_of[&root_vertex];
272            if indices[root as usize] != UNVISITED {
273                continue;
274            }
275            indices[root as usize] = index_counter;
276            lowlinks[root as usize] = index_counter;
277            index_counter += 1;
278            stack.push(root);
279            on_stack[root as usize] = true;
280            frames.push((root, deps_of(root_vertex), 0));
281
282            while let Some(frame) = frames.last_mut() {
283                let vertex = frame.0 as usize;
284                if let Some(dep_vertex) = frame.1.get(frame.2 as usize) {
285                    frame.2 += 1;
286                    // Only consider dependencies that are part of the current
287                    // scheduling task.
288                    let Some(&dep) = local_of.get(&dep_vertex) else {
289                        continue;
290                    };
291                    let d = dep as usize;
292                    if indices[d] == UNVISITED {
293                        // Not yet visited: descend (the recursive call).
294                        indices[d] = index_counter;
295                        lowlinks[d] = index_counter;
296                        index_counter += 1;
297                        stack.push(dep);
298                        on_stack[d] = true;
299                        frames.push((dep, deps_of(dep_vertex), 0));
300                    } else if on_stack[d] {
301                        // On the Tarjan stack: in the current SCC.
302                        let dep_index = indices[d];
303                        if dep_index < lowlinks[vertex] {
304                            lowlinks[vertex] = dep_index;
305                        }
306                    }
307                } else {
308                    // Vertex exhausted: emit its SCC if it is a root, then
309                    // return to the parent frame, folding the child lowlink in
310                    // (the post-recursion `min(lowlink[v], lowlink[dep])`).
311                    let vertex_lowlink = lowlinks[vertex];
312                    if vertex_lowlink == indices[vertex] {
313                        let mut scc = Vec::new();
314                        loop {
315                            let w = stack.pop().unwrap();
316                            on_stack[w as usize] = false;
317                            scc.push(vertex_of_local[w as usize]);
318                            if w as usize == vertex {
319                                break;
320                            }
321                        }
322                        sccs.push(scc);
323                    }
324                    frames.pop();
325                    if let Some(parent) = frames.last() {
326                        let p = parent.0 as usize;
327                        if vertex_lowlink < lowlinks[p] {
328                            lowlinks[p] = vertex_lowlink;
329                        }
330                    }
331                }
332            }
333        }
334
335        Ok(sccs)
336    }
337
338    /// Test-only visibility shim over the private virtual-deps entry, used by
339    /// the differential test in `engine/tests/tarjan_differential.rs`.
340    #[cfg(test)]
341    pub(crate) fn tarjan_scc_with_virtual_for_tests(
342        &self,
343        vertices: &[VertexId],
344        vdeps: &FxHashMap<VertexId, Vec<VertexId>>,
345    ) -> Result<Vec<Vec<VertexId>>, ExcelError> {
346        self.tarjan_scc_with_virtual(vertices, vdeps)
347    }
348
349    /// Recursive reference implementation, retained ONLY for the differential
350    /// test (`engine/tests/tarjan_differential.rs`) that proves the iterative
351    /// rewrite emits byte-identical SCC output. Never call on deep graphs
352    /// without a large stack: it overflows around depth ~1500 in debug.
353    #[cfg(test)]
354    pub(crate) fn tarjan_scc_recursive_reference(
355        &self,
356        vertices: &[VertexId],
357    ) -> Result<Vec<Vec<VertexId>>, ExcelError> {
358        let mut index_counter = 0;
359        let mut stack = Vec::new();
360        let mut indices = FxHashMap::default();
361        let mut lowlinks = FxHashMap::default();
362        let mut on_stack = FxHashSet::default();
363        let mut sccs = Vec::new();
364        let vertex_set: FxHashSet<VertexId> = vertices.iter().copied().collect();
365
366        for &vertex in vertices {
367            if !indices.contains_key(&vertex) {
368                self.tarjan_visit(
369                    vertex,
370                    &mut index_counter,
371                    &mut stack,
372                    &mut indices,
373                    &mut lowlinks,
374                    &mut on_stack,
375                    &mut sccs,
376                    &vertex_set,
377                )?;
378            }
379        }
380
381        Ok(sccs)
382    }
383
384    /// Recursive reference with virtual deps; see
385    /// [`Self::tarjan_scc_recursive_reference`].
386    #[cfg(test)]
387    pub(crate) fn tarjan_scc_with_virtual_recursive_reference(
388        &self,
389        vertices: &[VertexId],
390        vdeps: &FxHashMap<VertexId, Vec<VertexId>>,
391    ) -> Result<Vec<Vec<VertexId>>, ExcelError> {
392        let mut index_counter = 0;
393        let mut stack = Vec::new();
394        let mut indices = FxHashMap::default();
395        let mut lowlinks = FxHashMap::default();
396        let mut on_stack = FxHashSet::default();
397        let mut sccs = Vec::new();
398        let vertex_set: FxHashSet<VertexId> = vertices.iter().copied().collect();
399
400        for &vertex in vertices {
401            if !indices.contains_key(&vertex) {
402                self.tarjan_visit_with_virtual(
403                    vertex,
404                    &mut index_counter,
405                    &mut stack,
406                    &mut indices,
407                    &mut lowlinks,
408                    &mut on_stack,
409                    &mut sccs,
410                    &vertex_set,
411                    vdeps,
412                )?;
413            }
414        }
415
416        Ok(sccs)
417    }
418
419    #[cfg(test)]
420    #[allow(clippy::too_many_arguments)]
421    fn tarjan_visit(
422        &self,
423        vertex: VertexId,
424        index_counter: &mut usize,
425        stack: &mut Vec<VertexId>,
426        indices: &mut FxHashMap<VertexId, usize>,
427        lowlinks: &mut FxHashMap<VertexId, usize>,
428        on_stack: &mut FxHashSet<VertexId>,
429        sccs: &mut Vec<Vec<VertexId>>,
430        vertex_set: &FxHashSet<VertexId>,
431    ) -> Result<(), ExcelError> {
432        // Set the depth index for vertex to the smallest unused index
433        indices.insert(vertex, *index_counter);
434        lowlinks.insert(vertex, *index_counter);
435        *index_counter += 1;
436        stack.push(vertex);
437        on_stack.insert(vertex);
438
439        // Consider successors of vertex (dependencies)
440        if let Some(dependencies) = self.graph.dependencies_slice(vertex) {
441            for &dependency in dependencies {
442                // Only consider dependencies that are part of the current scheduling task
443                if !vertex_set.contains(&dependency) {
444                    continue;
445                }
446
447                if !indices.contains_key(&dependency) {
448                    // Successor dependency has not yet been visited; recurse on it
449                    self.tarjan_visit(
450                        dependency,
451                        index_counter,
452                        stack,
453                        indices,
454                        lowlinks,
455                        on_stack,
456                        sccs,
457                        vertex_set,
458                    )?;
459                    let dep_lowlink = lowlinks[&dependency];
460                    lowlinks.insert(vertex, lowlinks[&vertex].min(dep_lowlink));
461                } else if on_stack.contains(&dependency) {
462                    // Successor dependency is in stack and hence in the current SCC
463                    let dep_index = indices[&dependency];
464                    lowlinks.insert(vertex, lowlinks[&vertex].min(dep_index));
465                }
466            }
467        } else {
468            let dependencies = self.graph.get_dependencies(vertex);
469            for dependency in dependencies {
470                // Only consider dependencies that are part of the current scheduling task
471                if !vertex_set.contains(&dependency) {
472                    continue;
473                }
474
475                if !indices.contains_key(&dependency) {
476                    // Successor dependency has not yet been visited; recurse on it
477                    self.tarjan_visit(
478                        dependency,
479                        index_counter,
480                        stack,
481                        indices,
482                        lowlinks,
483                        on_stack,
484                        sccs,
485                        vertex_set,
486                    )?;
487                    let dep_lowlink = lowlinks[&dependency];
488                    lowlinks.insert(vertex, lowlinks[&vertex].min(dep_lowlink));
489                } else if on_stack.contains(&dependency) {
490                    // Successor dependency is in stack and hence in the current SCC
491                    let dep_index = indices[&dependency];
492                    lowlinks.insert(vertex, lowlinks[&vertex].min(dep_index));
493                }
494            }
495        }
496
497        // If vertex is a root node, pop the stack and print an SCC
498        if lowlinks[&vertex] == indices[&vertex] {
499            let mut scc = Vec::new();
500            loop {
501                let w = stack.pop().unwrap();
502                on_stack.remove(&w);
503                scc.push(w);
504                if w == vertex {
505                    break;
506                }
507            }
508            sccs.push(scc);
509        }
510
511        Ok(())
512    }
513
514    #[cfg(test)]
515    #[allow(clippy::too_many_arguments)]
516    fn tarjan_visit_with_virtual(
517        &self,
518        vertex: VertexId,
519        index_counter: &mut usize,
520        stack: &mut Vec<VertexId>,
521        indices: &mut FxHashMap<VertexId, usize>,
522        lowlinks: &mut FxHashMap<VertexId, usize>,
523        on_stack: &mut FxHashSet<VertexId>,
524        sccs: &mut Vec<Vec<VertexId>>,
525        vertex_set: &FxHashSet<VertexId>,
526        vdeps: &FxHashMap<VertexId, Vec<VertexId>>,
527    ) -> Result<(), ExcelError> {
528        // Set the depth index for vertex to the smallest unused index
529        indices.insert(vertex, *index_counter);
530        lowlinks.insert(vertex, *index_counter);
531        *index_counter += 1;
532        stack.push(vertex);
533        on_stack.insert(vertex);
534
535        // Consider successors of vertex (dependencies) including virtual deps
536        if let Some(extra) = vdeps.get(&vertex) {
537            let mut dependencies: Vec<VertexId> =
538                if let Some(base) = self.graph.dependencies_slice(vertex) {
539                    base.to_vec()
540                } else {
541                    self.graph.get_dependencies(vertex)
542                };
543            dependencies.extend(extra.iter().copied());
544
545            for dependency in dependencies {
546                // Only consider dependencies that are part of the current scheduling task
547                if !vertex_set.contains(&dependency) {
548                    continue;
549                }
550
551                if !indices.contains_key(&dependency) {
552                    // Successor dependency has not yet been visited; recurse on it
553                    self.tarjan_visit_with_virtual(
554                        dependency,
555                        index_counter,
556                        stack,
557                        indices,
558                        lowlinks,
559                        on_stack,
560                        sccs,
561                        vertex_set,
562                        vdeps,
563                    )?;
564                    let dep_lowlink = lowlinks[&dependency];
565                    lowlinks.insert(vertex, lowlinks[&vertex].min(dep_lowlink));
566                } else if on_stack.contains(&dependency) {
567                    // Successor dependency is in stack and hence in the current SCC
568                    let dep_index = indices[&dependency];
569                    lowlinks.insert(vertex, lowlinks[&vertex].min(dep_index));
570                }
571            }
572        } else if let Some(dependencies) = self.graph.dependencies_slice(vertex) {
573            for &dependency in dependencies {
574                // Only consider dependencies that are part of the current scheduling task
575                if !vertex_set.contains(&dependency) {
576                    continue;
577                }
578
579                if !indices.contains_key(&dependency) {
580                    // Successor dependency has not yet been visited; recurse on it
581                    self.tarjan_visit_with_virtual(
582                        dependency,
583                        index_counter,
584                        stack,
585                        indices,
586                        lowlinks,
587                        on_stack,
588                        sccs,
589                        vertex_set,
590                        vdeps,
591                    )?;
592                    let dep_lowlink = lowlinks[&dependency];
593                    lowlinks.insert(vertex, lowlinks[&vertex].min(dep_lowlink));
594                } else if on_stack.contains(&dependency) {
595                    // Successor dependency is in stack and hence in the current SCC
596                    let dep_index = indices[&dependency];
597                    lowlinks.insert(vertex, lowlinks[&vertex].min(dep_index));
598                }
599            }
600        } else {
601            let dependencies = self.graph.get_dependencies(vertex);
602            for dependency in dependencies {
603                // Only consider dependencies that are part of the current scheduling task
604                if !vertex_set.contains(&dependency) {
605                    continue;
606                }
607
608                if !indices.contains_key(&dependency) {
609                    // Successor dependency has not yet been visited; recurse on it
610                    self.tarjan_visit_with_virtual(
611                        dependency,
612                        index_counter,
613                        stack,
614                        indices,
615                        lowlinks,
616                        on_stack,
617                        sccs,
618                        vertex_set,
619                        vdeps,
620                    )?;
621                    let dep_lowlink = lowlinks[&dependency];
622                    lowlinks.insert(vertex, lowlinks[&vertex].min(dep_lowlink));
623                } else if on_stack.contains(&dependency) {
624                    // Successor dependency is in stack and hence in the current SCC
625                    let dep_index = indices[&dependency];
626                    lowlinks.insert(vertex, lowlinks[&vertex].min(dep_index));
627                }
628            }
629        }
630
631        // If vertex is a root node, pop the stack and produce an SCC
632        if lowlinks[&vertex] == indices[&vertex] {
633            let mut scc = Vec::new();
634            loop {
635                let w = stack.pop().unwrap();
636                on_stack.remove(&w);
637                scc.push(w);
638                if w == vertex {
639                    break;
640                }
641            }
642            sccs.push(scc);
643        }
644
645        Ok(())
646    }
647
648    pub(crate) fn separate_cycles(
649        &self,
650        sccs: Vec<Vec<VertexId>>,
651    ) -> (Vec<Vec<VertexId>>, Vec<Vec<VertexId>>) {
652        let mut cycles = Vec::new();
653        let mut acyclic = Vec::new();
654
655        for scc in sccs {
656            if scc.len() > 1 || (scc.len() == 1 && self.has_self_loop(scc[0])) {
657                cycles.push(scc);
658            } else {
659                acyclic.push(scc);
660            }
661        }
662
663        (cycles, acyclic)
664    }
665
666    fn has_self_loop(&self, vertex: VertexId) -> bool {
667        self.graph.has_self_loop(vertex)
668    }
669
670    pub(crate) fn build_layers(
671        &self,
672        acyclic_sccs: Vec<Vec<VertexId>>,
673    ) -> Result<Vec<Layer>, ExcelError> {
674        let vertices: Vec<VertexId> = acyclic_sccs.into_iter().flatten().collect();
675        if vertices.is_empty() {
676            return Ok(Vec::new());
677        }
678        let vertex_set: FxHashSet<VertexId> = vertices.iter().copied().collect();
679
680        // Calculate in-degrees for all vertices in the acyclic subgraph
681        let mut in_degrees: FxHashMap<VertexId, usize> = vertices.iter().map(|&v| (v, 0)).collect();
682        for &vertex_id in &vertices {
683            if let Some(dependencies) = self.graph.dependencies_slice(vertex_id) {
684                for &dep_id in dependencies {
685                    if vertex_set.contains(&dep_id)
686                        && let Some(in_degree) = in_degrees.get_mut(&vertex_id)
687                    {
688                        *in_degree += 1;
689                    }
690                }
691            } else {
692                let dependencies = self.graph.get_dependencies(vertex_id);
693                for dep_id in dependencies {
694                    if vertex_set.contains(&dep_id)
695                        && let Some(in_degree) = in_degrees.get_mut(&vertex_id)
696                    {
697                        *in_degree += 1;
698                    }
699                }
700            }
701        }
702
703        // Initialize the queue with all nodes having an in-degree of 0
704        let mut queue: std::collections::VecDeque<VertexId> = in_degrees
705            .iter()
706            .filter(|&(_, &in_degree)| in_degree == 0)
707            .map(|(&v, _)| v)
708            .collect();
709
710        let mut layers = Vec::new();
711        let mut processed_count = 0;
712
713        while !queue.is_empty() {
714            let mut current_layer_vertices = Vec::new();
715            for _ in 0..queue.len() {
716                let u = queue.pop_front().unwrap();
717                current_layer_vertices.push(u);
718                processed_count += 1;
719
720                // For each dependent of u, reduce its in-degree
721                if let Some(dependents) = self.graph.dependents_slice(u) {
722                    for &v_dep in dependents {
723                        if let Some(in_degree) = in_degrees.get_mut(&v_dep) {
724                            *in_degree -= 1;
725                            if *in_degree == 0 {
726                                queue.push_back(v_dep);
727                            }
728                        }
729                    }
730                } else {
731                    for v_dep in self.graph.get_dependents(u) {
732                        if let Some(in_degree) = in_degrees.get_mut(&v_dep) {
733                            *in_degree -= 1;
734                            if *in_degree == 0 {
735                                queue.push_back(v_dep);
736                            }
737                        }
738                    }
739                }
740            }
741            // Sort for deterministic output in tests
742            current_layer_vertices.sort();
743            layers.push(Layer {
744                vertices: current_layer_vertices,
745            });
746        }
747
748        if processed_count != vertices.len() {
749            return Err(
750                ExcelError::new(formualizer_common::ExcelErrorKind::Circ).with_message(
751                    "Unexpected cycle detected in acyclic components during layer construction"
752                        .to_string(),
753                ),
754            );
755        }
756
757        Ok(layers)
758    }
759
760    /// Kahn's algorithm over the condensation of the scheduled subgraph:
761    /// each cyclic SCC is a super-node, each acyclic vertex a singleton node.
762    ///
763    /// Per Kahn wave we emit first the wave's `Cycle` units (ordered by
764    /// smallest member `VertexId` for determinism), then one `Layer` unit with
765    /// the wave's singleton vertices (sorted, as in `build_layers`). Within a
766    /// wave there are no inter-node edges, so this ordering is semantically
767    /// free. The result guarantees every `Cycle` unit appears after all units
768    /// containing its external dependencies and before all units containing
769    /// its external dependents (pinned by tests in
770    /// `engine/tests/schedule_units.rs`).
771    ///
772    /// Returns the unit walk plus the `Layer` units in order (the
773    /// compatibility `Schedule::layers` view).
774    fn build_condensation_units(
775        &self,
776        cycles: &[Vec<VertexId>],
777        acyclic_sccs: Vec<Vec<VertexId>>,
778        vdeps: Option<&FxHashMap<VertexId, Vec<VertexId>>>,
779    ) -> Result<(Vec<ScheduleUnit>, Vec<Layer>), ExcelError> {
780        let singletons: Vec<VertexId> = acyclic_sccs.into_iter().flatten().collect();
781        let cycle_node_count = cycles.len();
782        let node_count = cycle_node_count + singletons.len();
783
784        // Map every scheduled vertex to its condensation node.
785        let mut node_of: FxHashMap<VertexId, usize> = FxHashMap::default();
786        for (i, cycle) in cycles.iter().enumerate() {
787            for &v in cycle {
788                node_of.insert(v, i);
789            }
790        }
791        for (j, &v) in singletons.iter().enumerate() {
792            node_of.insert(v, cycle_node_count + j);
793        }
794
795        // Build in-degrees and the dependents adjacency from a single scan of
796        // the dependency direction, so both sides are guaranteed consistent.
797        // Edges to vertices outside the scheduled set are ignored
798        // (membership == presence in `node_of`); intra-node edges are ignored.
799        let mut in_degrees: Vec<usize> = vec![0; node_count];
800        let mut node_dependents: Vec<Vec<usize>> = vec![Vec::new(); node_count];
801        let scan_dep = |from_node: usize,
802                        dep: VertexId,
803                        in_degrees: &mut Vec<usize>,
804                        node_dependents: &mut Vec<Vec<usize>>| {
805            if let Some(&dep_node) = node_of.get(&dep)
806                && dep_node != from_node
807            {
808                in_degrees[from_node] += 1;
809                node_dependents[dep_node].push(from_node);
810            }
811        };
812        for (&v, &n_v) in node_of.iter() {
813            if let Some(deps) = self.graph.dependencies_slice(v) {
814                for &dep in deps {
815                    scan_dep(n_v, dep, &mut in_degrees, &mut node_dependents);
816                }
817            } else {
818                for dep in self.graph.get_dependencies(v) {
819                    scan_dep(n_v, dep, &mut in_degrees, &mut node_dependents);
820                }
821            }
822            if let Some(extra) = vdeps.and_then(|m| m.get(&v)) {
823                for &dep in extra {
824                    scan_dep(n_v, dep, &mut in_degrees, &mut node_dependents);
825                }
826            }
827        }
828
829        // Kahn by waves over condensation nodes.
830        let mut current: Vec<usize> = (0..node_count).filter(|&n| in_degrees[n] == 0).collect();
831        let mut units = Vec::new();
832        let mut layers = Vec::new();
833        let mut processed_count = 0usize;
834        while !current.is_empty() {
835            let mut wave_cycles: Vec<usize> = current
836                .iter()
837                .copied()
838                .filter(|&n| n < cycle_node_count)
839                .collect();
840            wave_cycles.sort_by_key(|&n| cycles[n].iter().copied().min());
841            for n in wave_cycles {
842                units.push(ScheduleUnit::Cycle(n as u32));
843            }
844
845            let mut wave_vertices: Vec<VertexId> = current
846                .iter()
847                .copied()
848                .filter(|&n| n >= cycle_node_count)
849                .map(|n| singletons[n - cycle_node_count])
850                .collect();
851            if !wave_vertices.is_empty() {
852                // Sort for deterministic output, as in build_layers.
853                wave_vertices.sort();
854                units.push(ScheduleUnit::Layer(layers.len() as u32));
855                layers.push(Layer {
856                    vertices: wave_vertices,
857                });
858            }
859
860            processed_count += current.len();
861            let mut next = Vec::new();
862            for &n in &current {
863                for &dependent in &node_dependents[n] {
864                    in_degrees[dependent] -= 1;
865                    if in_degrees[dependent] == 0 {
866                        next.push(dependent);
867                    }
868                }
869            }
870            current = next;
871        }
872
873        if processed_count != node_count {
874            return Err(
875                ExcelError::new(formualizer_common::ExcelErrorKind::Circ).with_message(
876                    "Unexpected cycle detected in condensation during unit construction"
877                        .to_string(),
878                ),
879            );
880        }
881
882        Ok((units, layers))
883    }
884
885    pub(crate) fn build_layers_with_virtual(
886        &self,
887        acyclic_sccs: Vec<Vec<VertexId>>,
888        vdeps: &FxHashMap<VertexId, Vec<VertexId>>,
889    ) -> Result<Vec<Layer>, ExcelError> {
890        use std::collections::VecDeque;
891        let vertices: Vec<VertexId> = acyclic_sccs.into_iter().flatten().collect();
892        if vertices.is_empty() {
893            return Ok(Vec::new());
894        }
895        let vertex_set: FxHashSet<VertexId> = vertices.iter().copied().collect();
896
897        // Build combined adjacency (dependencies and dependents) within the subset
898        let mut combined_deps: FxHashMap<VertexId, Vec<VertexId>> = FxHashMap::default();
899        let mut combined_out: FxHashMap<VertexId, Vec<VertexId>> = FxHashMap::default();
900        for &v in &vertices {
901            let mut deps: Vec<VertexId> = Vec::new();
902            if let Some(base) = self.graph.dependencies_slice(v) {
903                deps.extend(base.iter().copied().filter(|d| vertex_set.contains(d)));
904            } else {
905                deps.extend(
906                    self.graph
907                        .get_dependencies(v)
908                        .into_iter()
909                        .filter(|d| vertex_set.contains(d)),
910                );
911            }
912            if let Some(extra) = vdeps.get(&v) {
913                deps.extend(extra.iter().copied().filter(|d| vertex_set.contains(d)));
914            }
915            deps.sort_unstable();
916            deps.dedup();
917            combined_deps.insert(v, deps);
918        }
919        // invert
920        for (&v, deps) in combined_deps.iter() {
921            for &d in deps {
922                combined_out.entry(d).or_default().push(v);
923            }
924        }
925        // in-degrees
926        let mut in_degrees: FxHashMap<VertexId, usize> = FxHashMap::default();
927        for &v in &vertices {
928            let indeg = combined_deps.get(&v).map(|v| v.len()).unwrap_or(0);
929            in_degrees.insert(v, indeg);
930        }
931        // queue of 0 in-degree
932        let mut queue: VecDeque<VertexId> = in_degrees
933            .iter()
934            .filter(|&(_, &deg)| deg == 0)
935            .map(|(&v, _)| v)
936            .collect();
937
938        let mut layers = Vec::new();
939        let mut processed_count = 0;
940        while !queue.is_empty() {
941            let mut cur = Vec::new();
942            for _ in 0..queue.len() {
943                let u = queue.pop_front().unwrap();
944                cur.push(u);
945                processed_count += 1;
946                if let Some(dependents) = combined_out.get(&u) {
947                    for &w in dependents {
948                        if let Some(ind) = in_degrees.get_mut(&w) {
949                            *ind = ind.saturating_sub(1);
950                            if *ind == 0 {
951                                queue.push_back(w);
952                            }
953                        }
954                    }
955                }
956            }
957            cur.sort_unstable();
958            layers.push(Layer { vertices: cur });
959        }
960        if processed_count != vertices.len() {
961            return Err(
962                ExcelError::new(formualizer_common::ExcelErrorKind::Circ).with_message(
963                    "Unexpected cycle detected in acyclic components during layer construction (virtual)"
964                        .to_string(),
965                ),
966            );
967        }
968        Ok(layers)
969    }
970}