daedalus_runtime/
scheduler.rs1use crate::plan::{BackpressureStrategy, EdgePolicyKind, RuntimePlan};
2use daedalus_planner::ExecutionPlan;
3use std::cmp::Reverse;
4use std::collections::BinaryHeap;
5
6#[derive(Clone, Debug)]
8pub struct SchedulerConfig {
9 pub default_policy: EdgePolicyKind,
11 pub backpressure: BackpressureStrategy,
13 pub lockfree_queues: bool,
15}
16
17impl Default for SchedulerConfig {
18 fn default() -> Self {
19 Self {
20 default_policy: EdgePolicyKind::Fifo,
21 backpressure: BackpressureStrategy::None,
22 lockfree_queues: false,
23 }
24 }
25}
26
27pub fn build_runtime(plan: &ExecutionPlan, config: &SchedulerConfig) -> RuntimePlan {
29 let mut runtime = RuntimePlan::from_execution(plan);
30 runtime.default_policy = config.default_policy.clone();
31 runtime.backpressure = config.backpressure.clone();
32 runtime.lockfree_queues = config.lockfree_queues;
33
34 runtime
36 .edges
37 .iter_mut()
38 .for_each(|edge| edge.4 = config.default_policy.clone());
39
40 if let Some(order) = plan.graph.metadata.get("schedule_order") {
41 let mut id_to_ref = std::collections::HashMap::new();
42 for (idx, node) in runtime.nodes.iter().enumerate() {
43 id_to_ref.insert(node.id.as_str(), daedalus_planner::NodeRef(idx));
44 }
45 let schedule: Vec<daedalus_planner::NodeRef> = order
46 .split(',')
47 .filter_map(|id| id_to_ref.get(id.trim()).copied())
48 .collect();
49 if !schedule.is_empty() {
50 runtime.schedule_order = schedule;
51 return runtime;
52 }
53 }
54
55 if let Some(order) = topo_order(&runtime) {
56 runtime.schedule_order = order;
57 return runtime;
58 }
59
60 let mut idxs: Vec<(usize, u8)> = runtime
62 .nodes
63 .iter()
64 .enumerate()
65 .map(|(i, n)| {
66 let p = match n.compute {
67 daedalus_planner::ComputeAffinity::GpuRequired => 0,
68 daedalus_planner::ComputeAffinity::GpuPreferred => 1,
69 daedalus_planner::ComputeAffinity::CpuOnly => 2,
70 };
71 (i, p)
72 })
73 .collect();
74 idxs.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
75 runtime.schedule_order = idxs
76 .into_iter()
77 .map(|(i, _)| daedalus_planner::NodeRef(i))
78 .collect();
79
80 runtime
81}
82
83fn topo_order(runtime: &RuntimePlan) -> Option<Vec<daedalus_planner::NodeRef>> {
84 let node_count = runtime.nodes.len();
85 if node_count == 0 {
86 return Some(Vec::new());
87 }
88 let mut indegree = vec![0usize; node_count];
89 let mut adj = vec![Vec::new(); node_count];
90 for (from, _, to, _, _) in &runtime.edges {
91 let from_idx = from.0;
92 let to_idx = to.0;
93 adj[from_idx].push(to_idx);
94 indegree[to_idx] += 1;
95 }
96
97 let mut heap: BinaryHeap<Reverse<(u8, usize)>> = BinaryHeap::new();
98 for (idx, &count) in indegree.iter().enumerate() {
99 if count == 0 {
100 heap.push(Reverse((node_priority(runtime, idx), idx)));
101 }
102 }
103
104 let mut order = Vec::with_capacity(node_count);
105 while let Some(Reverse((_prio, idx))) = heap.pop() {
106 order.push(daedalus_planner::NodeRef(idx));
107 for &next in &adj[idx] {
108 indegree[next] = indegree[next].saturating_sub(1);
109 if indegree[next] == 0 {
110 heap.push(Reverse((node_priority(runtime, next), next)));
111 }
112 }
113 }
114
115 if order.len() == node_count {
116 Some(order)
117 } else {
118 None
119 }
120}
121
122fn node_priority(runtime: &RuntimePlan, idx: usize) -> u8 {
123 match runtime.nodes[idx].compute {
124 daedalus_planner::ComputeAffinity::GpuRequired => 0,
125 daedalus_planner::ComputeAffinity::GpuPreferred => 1,
126 daedalus_planner::ComputeAffinity::CpuOnly => 2,
127 }
128}