1#![allow(dead_code)]
7
8use std::collections::{HashMap, VecDeque};
9
10pub trait GraphTopology {
16 fn nodes(&self) -> Vec<String>;
18 fn dependencies(&self, node_id: &str) -> Vec<String>;
20}
21
22#[derive(Debug, Clone)]
29pub struct ExecutionOrder {
30 pub node_ids: Vec<String>,
32 pub parallelizable_groups: Vec<Vec<String>>,
34}
35
36pub struct TopologicalScheduler;
43
44impl TopologicalScheduler {
45 pub fn schedule(graph: &dyn GraphTopology) -> Result<ExecutionOrder, String> {
53 let all_nodes = graph.nodes();
54 if all_nodes.is_empty() {
55 return Ok(ExecutionOrder {
56 node_ids: vec![],
57 parallelizable_groups: vec![],
58 });
59 }
60
61 let mut in_degree: HashMap<&str, usize> = HashMap::new();
63 let mut successors: HashMap<&str, Vec<&str>> = HashMap::new();
64
65 for node in &all_nodes {
66 in_degree.entry(node.as_str()).or_insert(0);
67 successors.entry(node.as_str()).or_default();
68 }
69
70 for node in &all_nodes {
71 for dep in graph.dependencies(node.as_str()) {
72 if let Some(succ) = successors.get_mut(dep.as_str()) {
74 succ.push(node.as_str());
75 }
76 *in_degree.entry(node.as_str()).or_insert(0) += 1;
77 }
78 }
79
80 let mut queue: VecDeque<&str> = in_degree
82 .iter()
83 .filter(|(_, °)| deg == 0)
84 .map(|(&id, _)| id)
85 .collect();
86
87 let mut node_ids: Vec<String> = Vec::new();
88 let mut parallelizable_groups: Vec<Vec<String>> = Vec::new();
89 let mut remaining = all_nodes.len();
90
91 let mut current_wave: Vec<&str> = queue.drain(..).collect();
93
94 while !current_wave.is_empty() {
95 current_wave.sort_unstable();
97 node_ids.extend(current_wave.iter().map(|s| s.to_string()));
98 parallelizable_groups.push(current_wave.iter().map(|s| s.to_string()).collect());
99 remaining -= current_wave.len();
100
101 let mut next_wave: Vec<&str> = Vec::new();
102 for &node in ¤t_wave {
103 if let Some(succs) = successors.get(node) {
104 for &succ in succs {
105 let deg = in_degree.entry(succ).or_insert(0);
108 *deg -= 1;
109 if *deg == 0 {
110 next_wave.push(succ);
111 }
112 }
113 }
114 }
115 current_wave = next_wave;
116 }
117
118 if remaining > 0 {
119 return Err("Cycle detected in graph".to_string());
120 }
121
122 Ok(ExecutionOrder {
123 node_ids,
124 parallelizable_groups,
125 })
126 }
127}
128
129#[derive(Debug, Clone, Copy)]
135pub struct ResourceConstraint {
136 pub max_cpu_threads: u32,
138 pub max_memory_mb: u64,
140 pub gpu_available: bool,
142}
143
144impl Default for ResourceConstraint {
145 fn default() -> Self {
146 Self {
147 max_cpu_threads: 4,
148 max_memory_mb: 2048,
149 gpu_available: false,
150 }
151 }
152}
153
154#[derive(Debug, Clone)]
156pub struct ExecutionStage {
157 pub nodes: Vec<String>,
159 pub estimated_cpu_threads: u32,
161 pub estimated_memory_mb: u64,
163}
164
165#[derive(Debug, Clone)]
167pub struct ExecutionPlan {
168 pub stages: Vec<ExecutionStage>,
170}
171
172pub struct ResourceAwareScheduler;
178
179impl ResourceAwareScheduler {
180 #[must_use]
186 pub fn schedule(order: &ExecutionOrder, constraints: &ResourceConstraint) -> ExecutionPlan {
187 const MEM_PER_NODE_MB: u64 = 64;
188
189 let mut stages: Vec<ExecutionStage> = Vec::new();
190
191 for group in &order.parallelizable_groups {
192 if group.is_empty() {
193 continue;
194 }
195
196 let max_nodes_by_threads = constraints.max_cpu_threads.max(1) as usize;
198 let max_nodes_by_mem = (constraints.max_memory_mb / MEM_PER_NODE_MB).max(1) as usize;
199 let chunk_size = max_nodes_by_threads.min(max_nodes_by_mem);
200
201 for chunk in group.chunks(chunk_size) {
202 let n = chunk.len() as u32;
203 stages.push(ExecutionStage {
204 nodes: chunk.to_vec(),
205 estimated_cpu_threads: n,
206 estimated_memory_mb: n as u64 * MEM_PER_NODE_MB,
207 });
208 }
209 }
210
211 ExecutionPlan { stages }
212 }
213}
214
215#[derive(Debug)]
217pub struct ParallelNodeResult {
218 pub node_id: String,
220 pub success: bool,
222 pub elapsed: std::time::Duration,
224 pub error: Option<String>,
226}
227
228#[derive(Debug, Default)]
230pub struct ParallelRunStats {
231 pub total_nodes: usize,
233 pub nodes_executed: usize,
235 pub succeeded: usize,
237 pub failures: usize,
239 pub total_elapsed: std::time::Duration,
241 pub stages_executed: usize,
243 pub max_concurrency: usize,
245}
246
247#[cfg(test)]
252mod tests {
253 use super::*;
254 use std::collections::HashSet;
255
256 struct MockGraph {
258 deps: HashMap<String, Vec<String>>,
260 }
261
262 impl MockGraph {
263 fn new() -> Self {
264 Self {
265 deps: HashMap::new(),
266 }
267 }
268
269 fn add_node(&mut self, id: &str) {
270 self.deps.entry(id.to_string()).or_default();
271 }
272
273 fn add_dep(&mut self, node: &str, dep: &str) {
274 self.deps
275 .entry(node.to_string())
276 .or_default()
277 .push(dep.to_string());
278 }
279 }
280
281 impl GraphTopology for MockGraph {
282 fn nodes(&self) -> Vec<String> {
283 let mut ids: Vec<String> = self.deps.keys().cloned().collect();
284 ids.sort(); ids
286 }
287
288 fn dependencies(&self, node_id: &str) -> Vec<String> {
289 self.deps.get(node_id).cloned().unwrap_or_default()
290 }
291 }
292
293 #[test]
296 fn test_schedule_empty_graph() {
297 let graph = MockGraph::new();
298 let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
299 assert!(order.node_ids.is_empty());
300 assert!(order.parallelizable_groups.is_empty());
301 }
302
303 #[test]
304 fn test_schedule_single_node() {
305 let mut graph = MockGraph::new();
306 graph.add_node("a");
307 let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
308 assert_eq!(order.node_ids, vec!["a"]);
309 assert_eq!(order.parallelizable_groups.len(), 1);
310 }
311
312 #[test]
313 fn test_schedule_linear_chain() {
314 let mut graph = MockGraph::new();
316 graph.add_node("a");
317 graph.add_node("b");
318 graph.add_node("c");
319 graph.add_dep("b", "a");
320 graph.add_dep("c", "b");
321 let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
322 let pos = |id: &str| {
323 order
324 .node_ids
325 .iter()
326 .position(|x| x == id)
327 .expect("iter should succeed")
328 };
329 assert!(pos("a") < pos("b"));
330 assert!(pos("b") < pos("c"));
331 assert_eq!(order.parallelizable_groups.len(), 3);
333 }
334
335 #[test]
336 fn test_schedule_parallel_nodes() {
337 let mut graph = MockGraph::new();
339 graph.add_node("a");
340 graph.add_node("b");
341 let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
342 assert_eq!(order.parallelizable_groups.len(), 1);
343 assert_eq!(order.parallelizable_groups[0].len(), 2);
344 }
345
346 #[test]
347 fn test_schedule_diamond() {
348 let mut graph = MockGraph::new();
350 graph.add_node("root");
351 graph.add_node("left");
352 graph.add_node("right");
353 graph.add_node("sink");
354 graph.add_dep("left", "root");
355 graph.add_dep("right", "root");
356 graph.add_dep("sink", "left");
357 graph.add_dep("sink", "right");
358 let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
359 let pos = |id: &str| {
360 order
361 .node_ids
362 .iter()
363 .position(|x| x == id)
364 .expect("iter should succeed")
365 };
366 assert!(pos("root") < pos("left"));
367 assert!(pos("root") < pos("right"));
368 assert!(pos("left") < pos("sink"));
369 assert!(pos("right") < pos("sink"));
370 }
371
372 #[test]
373 fn test_schedule_all_nodes_included() {
374 let mut graph = MockGraph::new();
375 for id in ["a", "b", "c", "d"] {
376 graph.add_node(id);
377 }
378 graph.add_dep("b", "a");
379 graph.add_dep("d", "c");
380 let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
381 let mut sorted = order.node_ids.clone();
382 sorted.sort();
383 assert_eq!(sorted, vec!["a", "b", "c", "d"]);
384 }
385
386 #[test]
387 fn test_schedule_cycle_detection() {
388 let mut graph = MockGraph::new();
390 graph.add_node("a");
391 graph.add_node("b");
392 graph.add_node("c");
393 graph.add_dep("b", "a");
394 graph.add_dep("c", "b");
395 graph.add_dep("a", "c"); let result = TopologicalScheduler::schedule(&graph);
397 assert!(result.is_err(), "cycle should produce an error");
398 }
399
400 #[test]
403 fn test_resource_schedule_empty_order() {
404 let order = ExecutionOrder {
405 node_ids: vec![],
406 parallelizable_groups: vec![],
407 };
408 let constraints = ResourceConstraint::default();
409 let plan = ResourceAwareScheduler::schedule(&order, &constraints);
410 assert!(plan.stages.is_empty());
411 }
412
413 #[test]
414 fn test_resource_schedule_respects_thread_limit() {
415 let nodes: Vec<String> = (0..6).map(|i| format!("n{i}")).collect();
417 let order = ExecutionOrder {
418 node_ids: nodes.clone(),
419 parallelizable_groups: vec![nodes],
420 };
421 let constraints = ResourceConstraint {
422 max_cpu_threads: 2,
423 max_memory_mb: 2048,
424 gpu_available: false,
425 };
426 let plan = ResourceAwareScheduler::schedule(&order, &constraints);
427 assert!(plan.stages.len() >= 3);
428 for stage in &plan.stages {
429 assert!(stage.estimated_cpu_threads <= 2);
430 }
431 }
432
433 #[test]
434 fn test_resource_schedule_stage_fields() {
435 let order = ExecutionOrder {
436 node_ids: vec!["a".to_string()],
437 parallelizable_groups: vec![vec!["a".to_string()]],
438 };
439 let constraints = ResourceConstraint::default();
440 let plan = ResourceAwareScheduler::schedule(&order, &constraints);
441 assert_eq!(plan.stages.len(), 1);
442 let stage = &plan.stages[0];
443 assert_eq!(stage.nodes, vec!["a"]);
444 assert!(stage.estimated_cpu_threads >= 1);
445 assert!(stage.estimated_memory_mb > 0);
446 }
447
448 #[test]
449 fn test_resource_schedule_all_nodes_covered() {
450 let mut graph = MockGraph::new();
451 for id in ["a", "b", "c"] {
452 graph.add_node(id);
453 }
454 graph.add_dep("b", "a");
455 let order = TopologicalScheduler::schedule(&graph).expect("schedule should succeed");
456 let constraints = ResourceConstraint::default();
457 let plan = ResourceAwareScheduler::schedule(&order, &constraints);
458 let covered: HashSet<String> = plan.stages.iter().flat_map(|s| s.nodes.clone()).collect();
459 assert!(covered.contains("a"));
460 assert!(covered.contains("b"));
461 assert!(covered.contains("c"));
462 }
463}