Skip to main content

quantrs2_sim/automatic_parallelization/
autoparallelengine_distribute_tasks_across_nodes_group.rs

1//! # AutoParallelEngine - distribute_tasks_across_nodes_group Methods
2//!
3//! This module contains method implementations for `AutoParallelEngine`.
4//!
5//! 🤖 Generated with [SplitRS](https://github.com/cool-japan/splitrs)
6
7use crate::distributed_simulator::{DistributedQuantumSimulator, DistributedSimulatorConfig};
8use quantrs2_core::{
9    error::{QuantRS2Error, QuantRS2Result},
10    gate::GateOp,
11    qubit::QubitId,
12};
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
14use uuid::Uuid;
15
16use super::types::{NodeCapacity, ParallelTask};
17
18use super::autoparallelengine_type::AutoParallelEngine;
19
20impl AutoParallelEngine {
21    /// Distribute tasks across cluster nodes
22    pub(super) fn distribute_tasks_across_nodes(
23        &self,
24        tasks: &[ParallelTask],
25        distributed_sim: &DistributedQuantumSimulator,
26    ) -> QuantRS2Result<Vec<Vec<ParallelTask>>> {
27        let cluster_status = distributed_sim.get_cluster_status();
28        let num_nodes = cluster_status.len();
29        if num_nodes == 0 {
30            return Ok(vec![tasks.to_vec()]);
31        }
32        let node_capacities = Self::analyze_node_capabilities(&cluster_status);
33        let mut sorted_tasks: Vec<_> = tasks.to_vec();
34        sorted_tasks.sort_by(|a, b| {
35            b.cost
36                .partial_cmp(&a.cost)
37                .unwrap_or(std::cmp::Ordering::Equal)
38        });
39        let mut distributed_tasks = vec![Vec::new(); num_nodes];
40        let mut node_loads = vec![0.0; num_nodes];
41        for task in sorted_tasks {
42            let best_node = Self::select_best_node_for_task(&task, &node_capacities, &node_loads);
43            distributed_tasks[best_node].push(task.clone());
44            node_loads[best_node] += task.cost;
45        }
46        Self::rebalance_node_distribution(
47            &mut distributed_tasks,
48            &node_capacities,
49            &mut node_loads,
50        )?;
51        Ok(distributed_tasks)
52    }
53    /// Analyze node capabilities from cluster status
54    pub(super) fn analyze_node_capabilities(
55        cluster_status: &HashMap<Uuid, crate::distributed_simulator::NodeInfo>,
56    ) -> Vec<NodeCapacity> {
57        cluster_status
58            .values()
59            .map(|info| NodeCapacity {
60                cpu_cores: 4,
61                memory_gb: 16.0,
62                gpu_available: false,
63                network_bandwidth_gbps: 10.0,
64                relative_performance: 1.0,
65            })
66            .collect()
67    }
68    /// Select the best node for a given task
69    pub(super) fn select_best_node_for_task(
70        task: &ParallelTask,
71        node_capacities: &[NodeCapacity],
72        node_loads: &[f64],
73    ) -> usize {
74        let mut best_node = 0;
75        let mut best_score = f64::NEG_INFINITY;
76        for (idx, capacity) in node_capacities.iter().enumerate() {
77            let load_factor = 1.0 - (node_loads[idx] / capacity.relative_performance).min(1.0);
78            let memory_factor = if task.memory_requirement
79                < (capacity.memory_gb * 1024.0 * 1024.0 * 1024.0) as usize
80            {
81                1.0
82            } else {
83                0.5
84            };
85            let score = load_factor * capacity.relative_performance * memory_factor;
86            if score > best_score {
87                best_score = score;
88                best_node = idx;
89            }
90        }
91        best_node
92    }
93    /// Rebalance task distribution if needed
94    pub(super) fn rebalance_node_distribution(
95        distributed_tasks: &mut [Vec<ParallelTask>],
96        node_capacities: &[NodeCapacity],
97        node_loads: &mut [f64],
98    ) -> QuantRS2Result<()> {
99        let total_load: f64 = node_loads.iter().sum();
100        let avg_load = total_load / node_loads.len() as f64;
101        const IMBALANCE_THRESHOLD: f64 = 0.3;
102        for _ in 0..5 {
103            let mut rebalanced = false;
104            let heavy_nodes: Vec<usize> = node_loads
105                .iter()
106                .enumerate()
107                .filter(|(_, load)| **load > avg_load * (1.0 + IMBALANCE_THRESHOLD))
108                .map(|(idx, _)| idx)
109                .collect();
110            let light_nodes: Vec<usize> = node_loads
111                .iter()
112                .enumerate()
113                .filter(|(_, load)| **load < avg_load * (1.0 - IMBALANCE_THRESHOLD))
114                .map(|(idx, _)| idx)
115                .collect();
116            for &heavy_idx in &heavy_nodes {
117                for &light_idx in &light_nodes {
118                    if heavy_idx != light_idx {
119                        if let Some(task) = distributed_tasks[heavy_idx].pop() {
120                            node_loads[heavy_idx] -= task.cost;
121                            distributed_tasks[light_idx].push(task.clone());
122                            node_loads[light_idx] += task.cost;
123                            rebalanced = true;
124                            break;
125                        }
126                    }
127                }
128                if rebalanced {
129                    break;
130                }
131            }
132            if !rebalanced {
133                break;
134            }
135        }
136        Ok(())
137    }
138}