daedalus_runtime/
scheduler.rs

1use crate::plan::{BackpressureStrategy, EdgePolicyKind, RuntimePlan};
2use daedalus_planner::ExecutionPlan;
3use std::cmp::Reverse;
4use std::collections::BinaryHeap;
5
6/// Scheduler configuration for edge policies and backpressure.
7#[derive(Clone, Debug)]
8pub struct SchedulerConfig {
9    /// Default policy applied to all edges unless overridden.
10    pub default_policy: EdgePolicyKind,
11    /// Backpressure strategy for edge queues.
12    pub backpressure: BackpressureStrategy,
13    /// Prefer lock-free bounded edge queues when available.
14    pub lockfree_queues: bool,
15}
16
17impl Default for SchedulerConfig {
18    fn default() -> Self {
19        Self {
20            default_policy: EdgePolicyKind::Fifo,
21            backpressure: BackpressureStrategy::None,
22            lockfree_queues: false,
23        }
24    }
25}
26
27/// Build a runtime plan from an execution plan; later will wire policies and orchestrator.
28pub fn build_runtime(plan: &ExecutionPlan, config: &SchedulerConfig) -> RuntimePlan {
29    let mut runtime = RuntimePlan::from_execution(plan);
30    runtime.default_policy = config.default_policy.clone();
31    runtime.backpressure = config.backpressure.clone();
32    runtime.lockfree_queues = config.lockfree_queues;
33
34    // Assign configured default policy to all edges for now.
35    runtime
36        .edges
37        .iter_mut()
38        .for_each(|edge| edge.4 = config.default_policy.clone());
39
40    if let Some(order) = plan.graph.metadata.get("schedule_order") {
41        let mut id_to_ref = std::collections::HashMap::new();
42        for (idx, node) in runtime.nodes.iter().enumerate() {
43            id_to_ref.insert(node.id.as_str(), daedalus_planner::NodeRef(idx));
44        }
45        let schedule: Vec<daedalus_planner::NodeRef> = order
46            .split(',')
47            .filter_map(|id| id_to_ref.get(id.trim()).copied())
48            .collect();
49        if !schedule.is_empty() {
50            runtime.schedule_order = schedule;
51            return runtime;
52        }
53    }
54
55    if let Some(order) = topo_order(&runtime) {
56        runtime.schedule_order = order;
57        return runtime;
58    }
59
60    // Fallback scheduler: order nodes by compute priority, then original index.
61    let mut idxs: Vec<(usize, u8)> = runtime
62        .nodes
63        .iter()
64        .enumerate()
65        .map(|(i, n)| {
66            let p = match n.compute {
67                daedalus_planner::ComputeAffinity::GpuRequired => 0,
68                daedalus_planner::ComputeAffinity::GpuPreferred => 1,
69                daedalus_planner::ComputeAffinity::CpuOnly => 2,
70            };
71            (i, p)
72        })
73        .collect();
74    idxs.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
75    runtime.schedule_order = idxs
76        .into_iter()
77        .map(|(i, _)| daedalus_planner::NodeRef(i))
78        .collect();
79
80    runtime
81}
82
83fn topo_order(runtime: &RuntimePlan) -> Option<Vec<daedalus_planner::NodeRef>> {
84    let node_count = runtime.nodes.len();
85    if node_count == 0 {
86        return Some(Vec::new());
87    }
88    let mut indegree = vec![0usize; node_count];
89    let mut adj = vec![Vec::new(); node_count];
90    for (from, _, to, _, _) in &runtime.edges {
91        let from_idx = from.0;
92        let to_idx = to.0;
93        adj[from_idx].push(to_idx);
94        indegree[to_idx] += 1;
95    }
96
97    let mut heap: BinaryHeap<Reverse<(u8, usize)>> = BinaryHeap::new();
98    for (idx, &count) in indegree.iter().enumerate() {
99        if count == 0 {
100            heap.push(Reverse((node_priority(runtime, idx), idx)));
101        }
102    }
103
104    let mut order = Vec::with_capacity(node_count);
105    while let Some(Reverse((_prio, idx))) = heap.pop() {
106        order.push(daedalus_planner::NodeRef(idx));
107        for &next in &adj[idx] {
108            indegree[next] = indegree[next].saturating_sub(1);
109            if indegree[next] == 0 {
110                heap.push(Reverse((node_priority(runtime, next), next)));
111            }
112        }
113    }
114
115    if order.len() == node_count {
116        Some(order)
117    } else {
118        None
119    }
120}
121
122fn node_priority(runtime: &RuntimePlan, idx: usize) -> u8 {
123    match runtime.nodes[idx].compute {
124        daedalus_planner::ComputeAffinity::GpuRequired => 0,
125        daedalus_planner::ComputeAffinity::GpuPreferred => 1,
126        daedalus_planner::ComputeAffinity::CpuOnly => 2,
127    }
128}