1use serde::{Deserialize, Serialize};
2
3pub use daedalus_core::policy::BackpressureStrategy;
4use daedalus_planner::{ComputeAffinity, EdgeBufferInfo, ExecutionPlan, GpuSegment, NodeRef};
5
6#[non_exhaustive]
14#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
15pub enum EdgePolicyKind {
16 #[default]
17 Fifo,
18 NewestWins,
19 Broadcast,
20 Bounded {
21 cap: usize,
22 },
23}
24
25#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
42pub struct RuntimeNode {
43 pub id: String,
44 pub bundle: Option<String>,
45 pub label: Option<String>,
46 pub compute: ComputeAffinity,
47 #[serde(default, skip_serializing_if = "Vec::is_empty")]
48 pub const_inputs: Vec<(String, daedalus_data::model::Value)>,
49 #[serde(default, skip_serializing_if = "Vec::is_empty")]
50 pub sync_groups: Vec<daedalus_core::sync::SyncGroup>,
51 #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
52 pub metadata: std::collections::BTreeMap<String, daedalus_data::model::Value>,
53}
54
55#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
64pub struct RuntimeSegment {
65 pub nodes: Vec<NodeRef>,
66 pub compute: ComputeAffinity,
67}
68
69#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
78pub struct RuntimePlan {
79 pub default_policy: EdgePolicyKind,
80 pub backpressure: BackpressureStrategy,
81 #[serde(skip)]
85 pub lockfree_queues: bool,
86 #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
88 pub graph_metadata: std::collections::BTreeMap<String, daedalus_data::model::Value>,
89 pub nodes: Vec<RuntimeNode>,
90 pub edges: Vec<(NodeRef, String, NodeRef, String, EdgePolicyKind)>,
91 pub gpu_segments: Vec<GpuSegment>,
92 pub gpu_edges: Vec<EdgeBufferInfo>,
93 #[serde(default, skip_serializing_if = "Vec::is_empty")]
95 pub gpu_entries: Vec<usize>,
96 #[serde(default, skip_serializing_if = "Vec::is_empty")]
98 pub gpu_exits: Vec<usize>,
99 pub segments: Vec<RuntimeSegment>,
100 pub schedule_order: Vec<NodeRef>,
101}
102
103impl RuntimePlan {
104 pub fn from_execution(plan: &ExecutionPlan) -> Self {
106 let (gpu_segments, gpu_edges) = plan.graph.gpu_buffers();
107 let (gpu_entries, gpu_exits) = {
108 let mut entries = Vec::new();
109 let mut exits = Vec::new();
110 for info in &gpu_edges {
111 if !info.gpu_fast_path {
112 let from =
114 plan.graph.nodes[plan.graph.edges[info.edge_index].from.node.0].compute;
115 let to = plan.graph.nodes[plan.graph.edges[info.edge_index].to.node.0].compute;
116 let from_gpu = matches!(
117 from,
118 ComputeAffinity::GpuPreferred | ComputeAffinity::GpuRequired
119 );
120 let to_gpu = matches!(
121 to,
122 ComputeAffinity::GpuPreferred | ComputeAffinity::GpuRequired
123 );
124 if !from_gpu && to_gpu {
125 entries.push(info.edge_index);
126 }
127 if from_gpu && !to_gpu {
128 exits.push(info.edge_index);
129 }
130 }
131 }
132 (entries, exits)
133 };
134 let nodes = plan
135 .graph
136 .nodes
137 .iter()
138 .map(|n| RuntimeNode {
139 id: n.id.0.clone(),
140 bundle: n.bundle.clone(),
141 label: n.label.clone(),
142 compute: n.compute,
143 const_inputs: n.const_inputs.clone(),
144 sync_groups: n.sync_groups.clone(),
145 metadata: n.metadata.clone(),
146 })
147 .collect();
148 let edges = plan
149 .graph
150 .edges
151 .iter()
152 .map(|e| {
153 (
154 e.from.node,
155 e.from.port.clone(),
156 e.to.node,
157 e.to.port.clone(),
158 EdgePolicyKind::Fifo,
159 )
160 })
161 .collect();
162
163 let mut order: Vec<NodeRef> = Vec::new();
164 if let Some(order_str) = plan.graph.metadata.get("schedule_order") {
165 let mut by_id: std::collections::HashMap<&str, usize> = std::collections::HashMap::new();
166 for (idx, node) in plan.graph.nodes.iter().enumerate() {
167 by_id.insert(node.id.0.as_str(), idx);
168 }
169 let mut seen = vec![false; plan.graph.nodes.len()];
170 for id in order_str.split(',').map(str::trim).filter(|v| !v.is_empty()) {
171 if let Some(idx) = by_id.get(id).copied()
172 && !seen[idx]
173 {
174 seen[idx] = true;
175 order.push(NodeRef(idx));
176 }
177 }
178 for (idx, was_seen) in seen.iter().enumerate() {
179 if !*was_seen {
180 order.push(NodeRef(idx));
181 }
182 }
183 } else {
184 order = (0..plan.graph.nodes.len()).map(NodeRef).collect();
185 }
186
187 let mut segments = Vec::new();
190 let mut current_gpu: Option<RuntimeSegment> = None;
191 for node_ref in &order {
192 let node = &plan.graph.nodes[node_ref.0];
193 match node.compute {
194 ComputeAffinity::GpuPreferred | ComputeAffinity::GpuRequired => {
195 if let Some(seg) = &mut current_gpu {
196 seg.nodes.push(*node_ref);
197 if matches!(node.compute, ComputeAffinity::GpuRequired) {
198 seg.compute = ComputeAffinity::GpuRequired;
199 }
200 } else {
201 current_gpu = Some(RuntimeSegment {
202 nodes: vec![*node_ref],
203 compute: node.compute,
204 });
205 }
206 }
207 ComputeAffinity::CpuOnly => {
208 if let Some(seg) = current_gpu.take() {
209 segments.push(seg);
210 }
211 segments.push(RuntimeSegment {
212 nodes: vec![*node_ref],
213 compute: ComputeAffinity::CpuOnly,
214 });
215 }
216 }
217 }
218 if let Some(seg) = current_gpu.take() {
219 segments.push(seg);
220 }
221
222 let graph_metadata = plan.graph.metadata_values.clone();
223
224 RuntimePlan {
225 default_policy: EdgePolicyKind::Fifo,
226 backpressure: BackpressureStrategy::None,
227 lockfree_queues: false,
228 graph_metadata,
229 nodes,
230 edges,
231 gpu_segments,
232 gpu_edges,
233 gpu_entries,
234 gpu_exits,
235 segments,
236 schedule_order: order,
237 }
238 }
239}