Skip to main content

oximedia_graph/
scheduler.rs

1//! Graph execution scheduler.
2//!
3//! Provides topological scheduling and resource-aware execution planning for
4//! filter graphs.
5
6#![allow(dead_code)]
7
8use std::collections::{HashMap, VecDeque};
9
10// ─────────────────────────────────────────────────────────────────────────────
11// GraphTopology trait
12// ─────────────────────────────────────────────────────────────────────────────
13
14/// Abstract view of a graph's topology needed for scheduling.
15pub trait GraphTopology {
16    /// Return all node IDs in the graph.
17    fn nodes(&self) -> Vec<String>;
18    /// Return IDs of nodes that `node_id` depends on (must run before it).
19    fn dependencies(&self, node_id: &str) -> Vec<String>;
20}
21
22// ─────────────────────────────────────────────────────────────────────────────
23// ExecutionOrder
24// ─────────────────────────────────────────────────────────────────────────────
25
26/// The result of topological scheduling.  Contains a flat execution order and
27/// groups of nodes that can be executed in parallel.
28#[derive(Debug, Clone)]
29pub struct ExecutionOrder {
30    /// Flat topological order of all node IDs.
31    pub node_ids: Vec<String>,
32    /// Groups of nodes with no mutual dependency that may run in parallel.
33    pub parallelizable_groups: Vec<Vec<String>>,
34}
35
36// ─────────────────────────────────────────────────────────────────────────────
37// TopologicalScheduler
38// ─────────────────────────────────────────────────────────────────────────────
39
40/// Produces a topological execution order using Kahn's algorithm and groups
41/// independent nodes into parallel execution groups.
42pub struct TopologicalScheduler;
43
44impl TopologicalScheduler {
45    /// Schedule all nodes in the graph.
46    ///
47    /// Returns an [`ExecutionOrder`] on success, or an error string if the
48    /// graph contains a cycle.
49    ///
50    /// # Errors
51    /// Returns `Err` when a cycle is detected.
52    pub fn schedule(graph: &dyn GraphTopology) -> Result<ExecutionOrder, String> {
53        let all_nodes = graph.nodes();
54        if all_nodes.is_empty() {
55            return Ok(ExecutionOrder {
56                node_ids: vec![],
57                parallelizable_groups: vec![],
58            });
59        }
60
61        // Build in-degree and adjacency list.
62        let mut in_degree: HashMap<&str, usize> = HashMap::new();
63        let mut successors: HashMap<&str, Vec<&str>> = HashMap::new();
64
65        for node in &all_nodes {
66            in_degree.entry(node.as_str()).or_insert(0);
67            successors.entry(node.as_str()).or_default();
68        }
69
70        for node in &all_nodes {
71            for dep in graph.dependencies(node.as_str()) {
72                // dep must run before node.
73                if let Some(succ) = successors.get_mut(dep.as_str()) {
74                    succ.push(node.as_str());
75                }
76                *in_degree.entry(node.as_str()).or_insert(0) += 1;
77            }
78        }
79
80        // Kahn's algorithm – process in waves (each wave = parallel group).
81        let mut queue: VecDeque<&str> = in_degree
82            .iter()
83            .filter(|(_, &deg)| deg == 0)
84            .map(|(&id, _)| id)
85            .collect();
86
87        let mut node_ids: Vec<String> = Vec::new();
88        let mut parallelizable_groups: Vec<Vec<String>> = Vec::new();
89        let mut remaining = all_nodes.len();
90
91        // We process level by level to build parallel groups.
92        let mut current_wave: Vec<&str> = queue.drain(..).collect();
93
94        while !current_wave.is_empty() {
95            // Sort for deterministic output.
96            current_wave.sort_unstable();
97            node_ids.extend(current_wave.iter().map(|s| s.to_string()));
98            parallelizable_groups.push(current_wave.iter().map(|s| s.to_string()).collect());
99            remaining -= current_wave.len();
100
101            let mut next_wave: Vec<&str> = Vec::new();
102            for &node in &current_wave {
103                if let Some(succs) = successors.get(node) {
104                    for &succ in succs {
105                        // succ was inserted into in_degree during graph construction above,
106                        // so this entry is guaranteed to exist.
107                        let deg = in_degree.entry(succ).or_insert(0);
108                        *deg -= 1;
109                        if *deg == 0 {
110                            next_wave.push(succ);
111                        }
112                    }
113                }
114            }
115            current_wave = next_wave;
116        }
117
118        if remaining > 0 {
119            return Err("Cycle detected in graph".to_string());
120        }
121
122        Ok(ExecutionOrder {
123            node_ids,
124            parallelizable_groups,
125        })
126    }
127}
128
129// ─────────────────────────────────────────────────────────────────────────────
130// Resource-aware scheduling
131// ─────────────────────────────────────────────────────────────────────────────
132
133/// Hardware resource constraints for execution planning.
134#[derive(Debug, Clone, Copy)]
135pub struct ResourceConstraint {
136    /// Maximum number of CPU threads that may be used simultaneously.
137    pub max_cpu_threads: u32,
138    /// Maximum memory that may be allocated simultaneously (in MB).
139    pub max_memory_mb: u64,
140    /// Whether a GPU is available.
141    pub gpu_available: bool,
142}
143
144impl Default for ResourceConstraint {
145    fn default() -> Self {
146        Self {
147            max_cpu_threads: 4,
148            max_memory_mb: 2048,
149            gpu_available: false,
150        }
151    }
152}
153
154/// A single stage of a resource-aware execution plan.
155#[derive(Debug, Clone)]
156pub struct ExecutionStage {
157    /// Node IDs to execute in this stage.
158    pub nodes: Vec<String>,
159    /// Estimated CPU threads required.
160    pub estimated_cpu_threads: u32,
161    /// Estimated memory required (MB).
162    pub estimated_memory_mb: u64,
163}
164
165/// A full resource-aware execution plan composed of ordered stages.
166#[derive(Debug, Clone)]
167pub struct ExecutionPlan {
168    /// Ordered execution stages.
169    pub stages: Vec<ExecutionStage>,
170}
171
172/// Converts a topological [`ExecutionOrder`] into an [`ExecutionPlan`] that
173/// respects the given [`ResourceConstraint`].
174///
175/// Each parallelizable group becomes one or more stages, splitting groups that
176/// would exceed the thread or memory limits.
177pub struct ResourceAwareScheduler;
178
179impl ResourceAwareScheduler {
180    /// Build an execution plan from an execution order and resource constraints.
181    ///
182    /// Uses simple heuristics:
183    /// - 1 CPU thread per node.
184    /// - 64 MB estimated memory per node.
185    #[must_use]
186    pub fn schedule(order: &ExecutionOrder, constraints: &ResourceConstraint) -> ExecutionPlan {
187        const MEM_PER_NODE_MB: u64 = 64;
188
189        let mut stages: Vec<ExecutionStage> = Vec::new();
190
191        for group in &order.parallelizable_groups {
192            if group.is_empty() {
193                continue;
194            }
195
196            // Split the group into chunks that fit within constraints.
197            let max_nodes_by_threads = constraints.max_cpu_threads.max(1) as usize;
198            let max_nodes_by_mem = (constraints.max_memory_mb / MEM_PER_NODE_MB).max(1) as usize;
199            let chunk_size = max_nodes_by_threads.min(max_nodes_by_mem);
200
201            for chunk in group.chunks(chunk_size) {
202                let n = chunk.len() as u32;
203                stages.push(ExecutionStage {
204                    nodes: chunk.to_vec(),
205                    estimated_cpu_threads: n,
206                    estimated_memory_mb: n as u64 * MEM_PER_NODE_MB,
207                });
208            }
209        }
210
211        ExecutionPlan { stages }
212    }
213}
214
215/// Result for a single node execution in a parallel stage.
216#[derive(Debug)]
217pub struct ParallelNodeResult {
218    /// Node identifier.
219    pub node_id: String,
220    /// Whether the node completed successfully.
221    pub success: bool,
222    /// Duration the node spent executing.
223    pub elapsed: std::time::Duration,
224    /// Optional error message if `success` is false.
225    pub error: Option<String>,
226}
227
228/// Aggregate statistics for a parallel execution run.
229#[derive(Debug, Default)]
230pub struct ParallelRunStats {
231    /// Total nodes executed.
232    pub total_nodes: usize,
233    /// Number of nodes executed (alias for compatibility).
234    pub nodes_executed: usize,
235    /// Number of nodes that succeeded.
236    pub succeeded: usize,
237    /// Number of nodes that failed.
238    pub failures: usize,
239    /// Total wall-clock duration.
240    pub total_elapsed: std::time::Duration,
241    /// Number of stages executed.
242    pub stages_executed: usize,
243    /// Maximum concurrency observed (nodes running in a single stage).
244    pub max_concurrency: usize,
245}
246
247// ─────────────────────────────────────────────────────────────────────────────
248// Unit tests
249// ─────────────────────────────────────────────────────────────────────────────
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use std::collections::HashSet;
255
256    /// Simple mock graph for testing.
257    struct MockGraph {
258        /// Adjacency list: node_id → list of dependency node_ids.
259        deps: HashMap<String, Vec<String>>,
260    }
261
262    impl MockGraph {
263        fn new() -> Self {
264            Self {
265                deps: HashMap::new(),
266            }
267        }
268
269        fn add_node(&mut self, id: &str) {
270            self.deps.entry(id.to_string()).or_default();
271        }
272
273        fn add_dep(&mut self, node: &str, dep: &str) {
274            self.deps
275                .entry(node.to_string())
276                .or_default()
277                .push(dep.to_string());
278        }
279    }
280
281    impl GraphTopology for MockGraph {
282        fn nodes(&self) -> Vec<String> {
283            let mut ids: Vec<String> = self.deps.keys().cloned().collect();
284            ids.sort(); // deterministic
285            ids
286        }
287
288        fn dependencies(&self, node_id: &str) -> Vec<String> {
289            self.deps.get(node_id).cloned().unwrap_or_default()
290        }
291    }
292
293    // ── TopologicalScheduler ─────────────────────────────────────────────────
294
295    #[test]
296    fn test_schedule_empty_graph() {
297        let graph = MockGraph::new();
298        let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
299        assert!(order.node_ids.is_empty());
300        assert!(order.parallelizable_groups.is_empty());
301    }
302
303    #[test]
304    fn test_schedule_single_node() {
305        let mut graph = MockGraph::new();
306        graph.add_node("a");
307        let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
308        assert_eq!(order.node_ids, vec!["a"]);
309        assert_eq!(order.parallelizable_groups.len(), 1);
310    }
311
312    #[test]
313    fn test_schedule_linear_chain() {
314        // a → b → c
315        let mut graph = MockGraph::new();
316        graph.add_node("a");
317        graph.add_node("b");
318        graph.add_node("c");
319        graph.add_dep("b", "a");
320        graph.add_dep("c", "b");
321        let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
322        let pos = |id: &str| {
323            order
324                .node_ids
325                .iter()
326                .position(|x| x == id)
327                .expect("iter should succeed")
328        };
329        assert!(pos("a") < pos("b"));
330        assert!(pos("b") < pos("c"));
331        // Each wave should be a single node.
332        assert_eq!(order.parallelizable_groups.len(), 3);
333    }
334
335    #[test]
336    fn test_schedule_parallel_nodes() {
337        // a and b have no dependency → should be in the same group.
338        let mut graph = MockGraph::new();
339        graph.add_node("a");
340        graph.add_node("b");
341        let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
342        assert_eq!(order.parallelizable_groups.len(), 1);
343        assert_eq!(order.parallelizable_groups[0].len(), 2);
344    }
345
346    #[test]
347    fn test_schedule_diamond() {
348        // root → left, root → right, left → sink, right → sink
349        let mut graph = MockGraph::new();
350        graph.add_node("root");
351        graph.add_node("left");
352        graph.add_node("right");
353        graph.add_node("sink");
354        graph.add_dep("left", "root");
355        graph.add_dep("right", "root");
356        graph.add_dep("sink", "left");
357        graph.add_dep("sink", "right");
358        let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
359        let pos = |id: &str| {
360            order
361                .node_ids
362                .iter()
363                .position(|x| x == id)
364                .expect("iter should succeed")
365        };
366        assert!(pos("root") < pos("left"));
367        assert!(pos("root") < pos("right"));
368        assert!(pos("left") < pos("sink"));
369        assert!(pos("right") < pos("sink"));
370    }
371
372    #[test]
373    fn test_schedule_all_nodes_included() {
374        let mut graph = MockGraph::new();
375        for id in ["a", "b", "c", "d"] {
376            graph.add_node(id);
377        }
378        graph.add_dep("b", "a");
379        graph.add_dep("d", "c");
380        let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
381        let mut sorted = order.node_ids.clone();
382        sorted.sort();
383        assert_eq!(sorted, vec!["a", "b", "c", "d"]);
384    }
385
386    #[test]
387    fn test_schedule_cycle_detection() {
388        // a → b → c → a (cycle)
389        let mut graph = MockGraph::new();
390        graph.add_node("a");
391        graph.add_node("b");
392        graph.add_node("c");
393        graph.add_dep("b", "a");
394        graph.add_dep("c", "b");
395        graph.add_dep("a", "c"); // creates cycle
396        let result = TopologicalScheduler::schedule(&graph);
397        assert!(result.is_err(), "cycle should produce an error");
398    }
399
400    // ── ResourceAwareScheduler ────────────────────────────────────────────────
401
402    #[test]
403    fn test_resource_schedule_empty_order() {
404        let order = ExecutionOrder {
405            node_ids: vec![],
406            parallelizable_groups: vec![],
407        };
408        let constraints = ResourceConstraint::default();
409        let plan = ResourceAwareScheduler::schedule(&order, &constraints);
410        assert!(plan.stages.is_empty());
411    }
412
413    #[test]
414    fn test_resource_schedule_respects_thread_limit() {
415        // 6 independent nodes with a 2-thread limit → at least 3 stages.
416        let nodes: Vec<String> = (0..6).map(|i| format!("n{i}")).collect();
417        let order = ExecutionOrder {
418            node_ids: nodes.clone(),
419            parallelizable_groups: vec![nodes],
420        };
421        let constraints = ResourceConstraint {
422            max_cpu_threads: 2,
423            max_memory_mb: 2048,
424            gpu_available: false,
425        };
426        let plan = ResourceAwareScheduler::schedule(&order, &constraints);
427        assert!(plan.stages.len() >= 3);
428        for stage in &plan.stages {
429            assert!(stage.estimated_cpu_threads <= 2);
430        }
431    }
432
433    #[test]
434    fn test_resource_schedule_stage_fields() {
435        let order = ExecutionOrder {
436            node_ids: vec!["a".to_string()],
437            parallelizable_groups: vec![vec!["a".to_string()]],
438        };
439        let constraints = ResourceConstraint::default();
440        let plan = ResourceAwareScheduler::schedule(&order, &constraints);
441        assert_eq!(plan.stages.len(), 1);
442        let stage = &plan.stages[0];
443        assert_eq!(stage.nodes, vec!["a"]);
444        assert!(stage.estimated_cpu_threads >= 1);
445        assert!(stage.estimated_memory_mb > 0);
446    }
447
448    #[test]
449    fn test_resource_schedule_all_nodes_covered() {
450        let mut graph = MockGraph::new();
451        for id in ["a", "b", "c"] {
452            graph.add_node(id);
453        }
454        graph.add_dep("b", "a");
455        let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
456        let constraints = ResourceConstraint::default();
457        let plan = ResourceAwareScheduler::schedule(&order, &constraints);
458        let covered: HashSet<String> = plan.stages.iter().flat_map(|s| s.nodes.clone()).collect();
459        assert!(covered.contains("a"));
460        assert!(covered.contains("b"));
461        assert!(covered.contains("c"));
462    }
463}