daedalus_runtime/
plan.rs

1use serde::{Deserialize, Serialize};
2
3pub use daedalus_core::policy::BackpressureStrategy;
4use daedalus_planner::{ComputeAffinity, EdgeBufferInfo, ExecutionPlan, GpuSegment, NodeRef};
5
6/// Edge policy kinds; default is FIFO.
7///
8/// ```
9/// use daedalus_runtime::EdgePolicyKind;
10/// let policy = EdgePolicyKind::NewestWins;
11/// assert_eq!(policy, EdgePolicyKind::NewestWins);
12/// ```
13#[non_exhaustive]
14#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
15pub enum EdgePolicyKind {
16    #[default]
17    Fifo,
18    NewestWins,
19    Broadcast,
20    Bounded {
21        cap: usize,
22    },
23}
24
25/// Runtime node with policy hints.
26///
27/// ```
28/// use daedalus_runtime::RuntimeNode;
29/// use daedalus_planner::ComputeAffinity;
30/// let node = RuntimeNode {
31///     id: "demo".into(),
32///     bundle: None,
33///     label: None,
34///     compute: ComputeAffinity::CpuOnly,
35///     const_inputs: vec![],
36///     sync_groups: vec![],
37///     metadata: Default::default(),
38/// };
39/// assert_eq!(node.id, "demo");
40/// ```
41#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
42pub struct RuntimeNode {
43    pub id: String,
44    pub bundle: Option<String>,
45    pub label: Option<String>,
46    pub compute: ComputeAffinity,
47    #[serde(default, skip_serializing_if = "Vec::is_empty")]
48    pub const_inputs: Vec<(String, daedalus_data::model::Value)>,
49    #[serde(default, skip_serializing_if = "Vec::is_empty")]
50    pub sync_groups: Vec<daedalus_core::sync::SyncGroup>,
51    #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
52    pub metadata: std::collections::BTreeMap<String, daedalus_data::model::Value>,
53}
54
55/// A schedulable segment (may group GPU-required nodes).
56///
57/// ```
58/// use daedalus_runtime::RuntimeSegment;
59/// use daedalus_planner::{ComputeAffinity, NodeRef};
60/// let seg = RuntimeSegment { nodes: vec![NodeRef(0)], compute: ComputeAffinity::CpuOnly };
61/// assert_eq!(seg.nodes.len(), 1);
62/// ```
63#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
64pub struct RuntimeSegment {
65    pub nodes: Vec<NodeRef>,
66    pub compute: ComputeAffinity,
67}
68
69/// Final runtime plan, derived from planner output.
70///
71/// ```
72/// use daedalus_runtime::RuntimePlan;
73/// use daedalus_planner::{ExecutionPlan, Graph};
74/// let plan = RuntimePlan::from_execution(&ExecutionPlan::new(Graph::default(), vec![]));
75/// assert!(plan.nodes.is_empty());
76/// ```
77#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
78pub struct RuntimePlan {
79    pub default_policy: EdgePolicyKind,
80    pub backpressure: BackpressureStrategy,
81    /// Prefer lock-free bounded edge queues when available.
82    ///
83    /// This is a runtime knob; it should not appear in serialized runtime plans.
84    #[serde(skip)]
85    pub lockfree_queues: bool,
86    /// Graph-level metadata (typed values) propagated into `ExecutionContext.graph_metadata`.
87    #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
88    pub graph_metadata: std::collections::BTreeMap<String, daedalus_data::model::Value>,
89    pub nodes: Vec<RuntimeNode>,
90    pub edges: Vec<(NodeRef, String, NodeRef, String, EdgePolicyKind)>,
91    pub gpu_segments: Vec<GpuSegment>,
92    pub gpu_edges: Vec<EdgeBufferInfo>,
93    /// Edges that enter a GPU segment (CPU->GPU).
94    #[serde(default, skip_serializing_if = "Vec::is_empty")]
95    pub gpu_entries: Vec<usize>,
96    /// Edges that leave a GPU segment to CPU consumers.
97    #[serde(default, skip_serializing_if = "Vec::is_empty")]
98    pub gpu_exits: Vec<usize>,
99    pub segments: Vec<RuntimeSegment>,
100    pub schedule_order: Vec<NodeRef>,
101}
102
103impl RuntimePlan {
104    /// Convert a planner execution plan into a runtime plan.
105    pub fn from_execution(plan: &ExecutionPlan) -> Self {
106        let (gpu_segments, gpu_edges) = plan.graph.gpu_buffers();
107        let (gpu_entries, gpu_exits) = {
108            let mut entries = Vec::new();
109            let mut exits = Vec::new();
110            for info in &gpu_edges {
111                if !info.gpu_fast_path {
112                    // If the source is GPU and target is CPU, it's an exit; if source CPU and target GPU, entry.
113                    let from =
114                        plan.graph.nodes[plan.graph.edges[info.edge_index].from.node.0].compute;
115                    let to = plan.graph.nodes[plan.graph.edges[info.edge_index].to.node.0].compute;
116                    let from_gpu = matches!(
117                        from,
118                        ComputeAffinity::GpuPreferred | ComputeAffinity::GpuRequired
119                    );
120                    let to_gpu = matches!(
121                        to,
122                        ComputeAffinity::GpuPreferred | ComputeAffinity::GpuRequired
123                    );
124                    if !from_gpu && to_gpu {
125                        entries.push(info.edge_index);
126                    }
127                    if from_gpu && !to_gpu {
128                        exits.push(info.edge_index);
129                    }
130                }
131            }
132            (entries, exits)
133        };
134        let nodes = plan
135            .graph
136            .nodes
137            .iter()
138            .map(|n| RuntimeNode {
139                id: n.id.0.clone(),
140                bundle: n.bundle.clone(),
141                label: n.label.clone(),
142                compute: n.compute,
143                const_inputs: n.const_inputs.clone(),
144                sync_groups: n.sync_groups.clone(),
145                metadata: n.metadata.clone(),
146            })
147            .collect();
148        let edges = plan
149            .graph
150            .edges
151            .iter()
152            .map(|e| {
153                (
154                    e.from.node,
155                    e.from.port.clone(),
156                    e.to.node,
157                    e.to.port.clone(),
158                    EdgePolicyKind::Fifo,
159                )
160            })
161            .collect();
162
163        let mut order: Vec<NodeRef> = Vec::new();
164        if let Some(order_str) = plan.graph.metadata.get("schedule_order") {
165            let mut by_id: std::collections::HashMap<&str, usize> = std::collections::HashMap::new();
166            for (idx, node) in plan.graph.nodes.iter().enumerate() {
167                by_id.insert(node.id.0.as_str(), idx);
168            }
169            let mut seen = vec![false; plan.graph.nodes.len()];
170            for id in order_str.split(',').map(str::trim).filter(|v| !v.is_empty()) {
171                if let Some(idx) = by_id.get(id).copied()
172                    && !seen[idx]
173                {
174                    seen[idx] = true;
175                    order.push(NodeRef(idx));
176                }
177            }
178            for (idx, was_seen) in seen.iter().enumerate() {
179                if !*was_seen {
180                    order.push(NodeRef(idx));
181                }
182            }
183        } else {
184            order = (0..plan.graph.nodes.len()).map(NodeRef).collect();
185        }
186
187        // Simple segmentation: group consecutive GPU-pref/required nodes into a single segment,
188        // leave CPU-only nodes as singletons. This is a placeholder until planner emits segments.
189        let mut segments = Vec::new();
190        let mut current_gpu: Option<RuntimeSegment> = None;
191        for node_ref in &order {
192            let node = &plan.graph.nodes[node_ref.0];
193            match node.compute {
194                ComputeAffinity::GpuPreferred | ComputeAffinity::GpuRequired => {
195                    if let Some(seg) = &mut current_gpu {
196                        seg.nodes.push(*node_ref);
197                        if matches!(node.compute, ComputeAffinity::GpuRequired) {
198                            seg.compute = ComputeAffinity::GpuRequired;
199                        }
200                    } else {
201                        current_gpu = Some(RuntimeSegment {
202                            nodes: vec![*node_ref],
203                            compute: node.compute,
204                        });
205                    }
206                }
207                ComputeAffinity::CpuOnly => {
208                    if let Some(seg) = current_gpu.take() {
209                        segments.push(seg);
210                    }
211                    segments.push(RuntimeSegment {
212                        nodes: vec![*node_ref],
213                        compute: ComputeAffinity::CpuOnly,
214                    });
215                }
216            }
217        }
218        if let Some(seg) = current_gpu.take() {
219            segments.push(seg);
220        }
221
222        let graph_metadata = plan.graph.metadata_values.clone();
223
224        RuntimePlan {
225            default_policy: EdgePolicyKind::Fifo,
226            backpressure: BackpressureStrategy::None,
227            lockfree_queues: false,
228            graph_metadata,
229            nodes,
230            edges,
231            gpu_segments,
232            gpu_edges,
233            gpu_entries,
234            gpu_exits,
235            segments,
236            schedule_order: order,
237        }
238    }
239}