quantrs2_sim/automatic_parallelization/
autoparallelengine_distribute_tasks_across_nodes_group.rs1use crate::distributed_simulator::{DistributedQuantumSimulator, DistributedSimulatorConfig};
8use quantrs2_core::{
9 error::{QuantRS2Error, QuantRS2Result},
10 gate::GateOp,
11 qubit::QubitId,
12};
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
14use uuid::Uuid;
15
16use super::types::{NodeCapacity, ParallelTask};
17
18use super::autoparallelengine_type::AutoParallelEngine;
19
20impl AutoParallelEngine {
21 pub(super) fn distribute_tasks_across_nodes(
23 &self,
24 tasks: &[ParallelTask],
25 distributed_sim: &DistributedQuantumSimulator,
26 ) -> QuantRS2Result<Vec<Vec<ParallelTask>>> {
27 let cluster_status = distributed_sim.get_cluster_status();
28 let num_nodes = cluster_status.len();
29 if num_nodes == 0 {
30 return Ok(vec![tasks.to_vec()]);
31 }
32 let node_capacities = Self::analyze_node_capabilities(&cluster_status);
33 let mut sorted_tasks: Vec<_> = tasks.to_vec();
34 sorted_tasks.sort_by(|a, b| {
35 b.cost
36 .partial_cmp(&a.cost)
37 .unwrap_or(std::cmp::Ordering::Equal)
38 });
39 let mut distributed_tasks = vec![Vec::new(); num_nodes];
40 let mut node_loads = vec![0.0; num_nodes];
41 for task in sorted_tasks {
42 let best_node = Self::select_best_node_for_task(&task, &node_capacities, &node_loads);
43 distributed_tasks[best_node].push(task.clone());
44 node_loads[best_node] += task.cost;
45 }
46 Self::rebalance_node_distribution(
47 &mut distributed_tasks,
48 &node_capacities,
49 &mut node_loads,
50 )?;
51 Ok(distributed_tasks)
52 }
53 pub(super) fn analyze_node_capabilities(
55 cluster_status: &HashMap<Uuid, crate::distributed_simulator::NodeInfo>,
56 ) -> Vec<NodeCapacity> {
57 cluster_status
58 .values()
59 .map(|info| NodeCapacity {
60 cpu_cores: 4,
61 memory_gb: 16.0,
62 gpu_available: false,
63 network_bandwidth_gbps: 10.0,
64 relative_performance: 1.0,
65 })
66 .collect()
67 }
68 pub(super) fn select_best_node_for_task(
70 task: &ParallelTask,
71 node_capacities: &[NodeCapacity],
72 node_loads: &[f64],
73 ) -> usize {
74 let mut best_node = 0;
75 let mut best_score = f64::NEG_INFINITY;
76 for (idx, capacity) in node_capacities.iter().enumerate() {
77 let load_factor = 1.0 - (node_loads[idx] / capacity.relative_performance).min(1.0);
78 let memory_factor = if task.memory_requirement
79 < (capacity.memory_gb * 1024.0 * 1024.0 * 1024.0) as usize
80 {
81 1.0
82 } else {
83 0.5
84 };
85 let score = load_factor * capacity.relative_performance * memory_factor;
86 if score > best_score {
87 best_score = score;
88 best_node = idx;
89 }
90 }
91 best_node
92 }
93 pub(super) fn rebalance_node_distribution(
95 distributed_tasks: &mut [Vec<ParallelTask>],
96 node_capacities: &[NodeCapacity],
97 node_loads: &mut [f64],
98 ) -> QuantRS2Result<()> {
99 let total_load: f64 = node_loads.iter().sum();
100 let avg_load = total_load / node_loads.len() as f64;
101 const IMBALANCE_THRESHOLD: f64 = 0.3;
102 for _ in 0..5 {
103 let mut rebalanced = false;
104 let heavy_nodes: Vec<usize> = node_loads
105 .iter()
106 .enumerate()
107 .filter(|(_, load)| **load > avg_load * (1.0 + IMBALANCE_THRESHOLD))
108 .map(|(idx, _)| idx)
109 .collect();
110 let light_nodes: Vec<usize> = node_loads
111 .iter()
112 .enumerate()
113 .filter(|(_, load)| **load < avg_load * (1.0 - IMBALANCE_THRESHOLD))
114 .map(|(idx, _)| idx)
115 .collect();
116 for &heavy_idx in &heavy_nodes {
117 for &light_idx in &light_nodes {
118 if heavy_idx != light_idx {
119 if let Some(task) = distributed_tasks[heavy_idx].pop() {
120 node_loads[heavy_idx] -= task.cost;
121 distributed_tasks[light_idx].push(task.clone());
122 node_loads[light_idx] += task.cost;
123 rebalanced = true;
124 break;
125 }
126 }
127 }
128 if rebalanced {
129 break;
130 }
131 }
132 if !rebalanced {
133 break;
134 }
135 }
136 Ok(())
137 }
138}