1use crate::distributed_simulator::{DistributedQuantumSimulator, DistributedSimulatorConfig};
8use crate::large_scale_simulator::{LargeScaleQuantumSimulator, LargeScaleSimulatorConfig};
9use quantrs2_circuit::builder::{Circuit, Simulator};
10use quantrs2_core::{
11 error::{QuantRS2Error, QuantRS2Result},
12 gate::GateOp,
13 qubit::QubitId,
14};
15use scirs2_core::parallel_ops::{current_num_threads, IndexedParallelIterator, ParallelIterator}; use scirs2_core::ndarray::{Array1, Array2, ArrayView1};
19use scirs2_core::Complex64;
20use serde::{Deserialize, Serialize};
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
22use std::sync::{Arc, Barrier, Mutex, RwLock};
23use std::thread;
24use std::time::{Duration, Instant};
25use uuid::Uuid;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct AutoParallelConfig {
30 pub max_threads: usize,
32
33 pub min_gates_for_parallel: usize,
35
36 pub strategy: ParallelizationStrategy,
38
39 pub resource_constraints: ResourceConstraints,
41
42 pub enable_inter_layer_parallel: bool,
44
45 pub enable_gate_fusion: bool,
47
48 pub scirs2_optimization_level: OptimizationLevel,
50
51 pub load_balancing: LoadBalancingConfig,
53
54 pub enable_analysis_caching: bool,
56
57 pub memory_budget: usize,
59}
60
61impl Default for AutoParallelConfig {
62 fn default() -> Self {
63 Self {
64 max_threads: current_num_threads(), min_gates_for_parallel: 10,
66 strategy: ParallelizationStrategy::DependencyAnalysis,
67 resource_constraints: ResourceConstraints::default(),
68 enable_inter_layer_parallel: true,
69 enable_gate_fusion: true,
70 scirs2_optimization_level: OptimizationLevel::Aggressive,
71 load_balancing: LoadBalancingConfig::default(),
72 enable_analysis_caching: true,
73 memory_budget: 4 * 1024 * 1024 * 1024, }
75 }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
80pub enum ParallelizationStrategy {
81 DependencyAnalysis,
83 LayerBased,
85 QubitPartitioning,
87 Hybrid,
89 MLGuided,
91 HardwareAware,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ResourceConstraints {
98 pub max_memory_per_thread: usize,
100 pub max_cpu_utilization: f64,
102 pub max_gates_per_thread: usize,
104 pub preferred_numa_node: Option<usize>,
106}
107
108impl Default for ResourceConstraints {
109 fn default() -> Self {
110 Self {
111 max_memory_per_thread: 1024 * 1024 * 1024, max_cpu_utilization: 0.8,
113 max_gates_per_thread: 1000,
114 preferred_numa_node: None,
115 }
116 }
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct LoadBalancingConfig {
122 pub enable_dynamic_balancing: bool,
124 pub work_stealing_strategy: WorkStealingStrategy,
126 pub monitoring_interval: Duration,
128 pub rebalancing_threshold: f64,
130}
131
132impl Default for LoadBalancingConfig {
133 fn default() -> Self {
134 Self {
135 enable_dynamic_balancing: true,
136 work_stealing_strategy: WorkStealingStrategy::Adaptive,
137 monitoring_interval: Duration::from_millis(100),
138 rebalancing_threshold: 0.2,
139 }
140 }
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
145pub enum WorkStealingStrategy {
146 Random,
148 CostAware,
150 LocalityAware,
152 Adaptive,
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158pub enum OptimizationLevel {
159 None,
161 Basic,
163 Advanced,
165 Aggressive,
167 Custom,
169}
170
171#[derive(Debug, Clone)]
173pub struct ParallelTask {
174 pub id: Uuid,
176 pub gates: Vec<Arc<dyn GateOp + Send + Sync>>,
178 pub qubits: HashSet<QubitId>,
180 pub cost: f64,
182 pub memory_requirement: usize,
184 pub dependencies: HashSet<Uuid>,
186 pub priority: TaskPriority,
188}
189
190#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
192pub enum TaskPriority {
193 Low = 1,
195 Normal = 2,
197 High = 3,
199 Critical = 4,
201}
202
203#[derive(Debug, Clone)]
205pub struct DependencyGraph {
206 pub nodes: Vec<GateNode>,
208 pub edges: HashMap<usize, Vec<usize>>,
210 pub reverse_edges: HashMap<usize, Vec<usize>>,
212 pub layers: Vec<Vec<usize>>,
214}
215
216#[derive(Debug, Clone)]
218pub struct GateNode {
219 pub gate_index: usize,
221 pub gate: Arc<dyn GateOp + Send + Sync>,
223 pub qubits: HashSet<QubitId>,
225 pub layer: usize,
227 pub cost: f64,
229}
230
231#[derive(Debug, Clone)]
233pub struct ParallelizationAnalysis {
234 pub tasks: Vec<ParallelTask>,
236 pub num_layers: usize,
238 pub efficiency: f64,
240 pub max_parallelism: usize,
242 pub critical_path_length: usize,
244 pub resource_utilization: ResourceUtilization,
246 pub recommendations: Vec<OptimizationRecommendation>,
248}
249
250#[derive(Debug, Clone)]
252pub struct ResourceUtilization {
253 pub cpu_utilization: Vec<f64>,
255 pub memory_usage: Vec<usize>,
257 pub load_balance_score: f64,
259 pub communication_overhead: f64,
261}
262
263#[derive(Debug, Clone)]
265pub struct OptimizationRecommendation {
266 pub recommendation_type: RecommendationType,
268 pub description: String,
270 pub expected_improvement: f64,
272 pub complexity: RecommendationComplexity,
274}
275
276#[derive(Debug, Clone, Copy, PartialEq, Eq)]
278pub enum RecommendationType {
279 GateReordering,
281 CircuitDecomposition,
283 ResourceAllocation,
285 StrategyChange,
287 HardwareConfiguration,
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
293pub enum RecommendationComplexity {
294 Low,
296 Medium,
298 High,
300}
301
302#[derive(Debug, Clone)]
304pub struct MLFeatures {
305 pub num_gates: usize,
307 pub num_qubits: usize,
309 pub circuit_depth: usize,
311 pub avg_connectivity: f64,
313 pub parallelism_factor: f64,
315 pub gate_distribution: HashMap<String, usize>,
317 pub entanglement_score: f64,
319 pub dependency_density: f64,
321}
322
323#[derive(Debug, Clone, Copy, PartialEq, Eq)]
325pub enum MLPredictedStrategy {
326 HighParallelism,
328 BalancedParallelism,
330 ConservativeParallelism,
332 LayerOptimized,
334}
335
336#[derive(Debug, Clone)]
338pub struct HardwareCharacteristics {
339 pub num_cores: usize,
341 pub l1_cache_size: usize,
343 pub l2_cache_size: usize,
345 pub l3_cache_size: usize,
347 pub memory_bandwidth: f64,
349 pub num_numa_nodes: usize,
351 pub has_gpu: bool,
353 pub simd_width: usize,
355}
356
357#[derive(Debug, Clone, Copy, PartialEq, Eq)]
359pub enum HardwareStrategy {
360 CacheOptimized,
362 SIMDOptimized,
364 NUMAAware,
366 GPUOffload,
368 Hybrid,
370}
371
372#[derive(Debug, Clone)]
374pub struct NodeCapacity {
375 pub cpu_cores: usize,
377 pub memory_gb: f64,
379 pub gpu_available: bool,
381 pub network_bandwidth_gbps: f64,
383 pub relative_performance: f64,
385}
386
387pub struct AutoParallelEngine {
389 config: AutoParallelConfig,
391 analysis_cache: Arc<RwLock<HashMap<u64, ParallelizationAnalysis>>>,
393 performance_stats: Arc<Mutex<ParallelPerformanceStats>>,
395 load_balancer: Arc<Mutex<LoadBalancer>>,
399}
400
401#[derive(Debug, Clone, Default)]
403pub struct ParallelPerformanceStats {
404 pub circuits_processed: usize,
406 pub total_execution_time: Duration,
408 pub average_efficiency: f64,
410 pub cache_hit_rate: f64,
412 pub task_stats: TaskCompletionStats,
414 pub resource_history: Vec<ResourceSnapshot>,
416}
417
418#[derive(Debug, Clone, Default)]
420pub struct TaskCompletionStats {
421 pub total_tasks: usize,
423 pub average_duration: Duration,
425 pub success_rate: f64,
427 pub load_balance_effectiveness: f64,
429}
430
431#[derive(Debug, Clone)]
433pub struct ResourceSnapshot {
434 pub timestamp: Instant,
436 pub cpu_utilization: Vec<f64>,
438 pub memory_usage: usize,
440 pub active_tasks: usize,
442}
443
444pub struct LoadBalancer {
446 thread_loads: Vec<f64>,
448 task_queues: Vec<VecDeque<ParallelTask>>,
450 work_stealing_stats: WorkStealingStats,
452}
453
454#[derive(Debug, Clone, Default)]
456pub struct WorkStealingStats {
457 pub steal_attempts: usize,
459 pub successful_steals: usize,
461 pub failed_steals: usize,
463 pub average_steal_latency: Duration,
465}
466
467impl AutoParallelEngine {
468 #[must_use]
470 pub fn new(config: AutoParallelConfig) -> Self {
471 let num_threads = config.max_threads;
472
473 Self {
474 config,
475 analysis_cache: Arc::new(RwLock::new(HashMap::new())),
476 performance_stats: Arc::new(Mutex::new(ParallelPerformanceStats::default())),
477 load_balancer: Arc::new(Mutex::new(LoadBalancer::new(num_threads))),
478 }
479 }
480
481 pub fn analyze_circuit<const N: usize>(
483 &self,
484 circuit: &Circuit<N>,
485 ) -> QuantRS2Result<ParallelizationAnalysis> {
486 let start_time = Instant::now();
487
488 if self.config.enable_analysis_caching {
490 let circuit_hash = Self::compute_circuit_hash(circuit);
491 if let Some(cached_analysis) = self
492 .analysis_cache
493 .read()
494 .expect("analysis cache read lock should not be poisoned")
495 .get(&circuit_hash)
496 {
497 return Ok(cached_analysis.clone());
498 }
499 }
500
501 let dependency_graph = self.build_dependency_graph(circuit)?;
503
504 let tasks = match self.config.strategy {
506 ParallelizationStrategy::DependencyAnalysis => {
507 self.dependency_based_parallelization(&dependency_graph)?
508 }
509 ParallelizationStrategy::LayerBased => {
510 self.layer_based_parallelization(&dependency_graph)?
511 }
512 ParallelizationStrategy::QubitPartitioning => {
513 self.qubit_partitioning_parallelization(circuit, &dependency_graph)?
514 }
515 ParallelizationStrategy::Hybrid => {
516 self.hybrid_parallelization(circuit, &dependency_graph)?
517 }
518 ParallelizationStrategy::MLGuided => {
519 self.ml_guided_parallelization(circuit, &dependency_graph)?
520 }
521 ParallelizationStrategy::HardwareAware => {
522 self.hardware_aware_parallelization(circuit, &dependency_graph)?
523 }
524 };
525
526 let analysis = self.calculate_parallelization_metrics(circuit, &dependency_graph, tasks)?;
528
529 if self.config.enable_analysis_caching {
531 let circuit_hash = Self::compute_circuit_hash(circuit);
532 self.analysis_cache
533 .write()
534 .expect("analysis cache write lock should not be poisoned")
535 .insert(circuit_hash, analysis.clone());
536 }
537
538 self.update_performance_stats(start_time.elapsed(), &analysis);
540
541 Ok(analysis)
542 }
543
544 pub fn execute_parallel<const N: usize>(
546 &self,
547 circuit: &Circuit<N>,
548 simulator: &mut LargeScaleQuantumSimulator,
549 ) -> QuantRS2Result<Vec<Complex64>> {
550 let analysis = self.analyze_circuit(circuit)?;
551
552 if analysis.tasks.len() < self.config.min_gates_for_parallel {
553 return Self::execute_sequential(circuit, simulator);
555 }
556
557 let barrier = Arc::new(Barrier::new(self.config.max_threads));
559 let shared_state = Arc::new(RwLock::new(simulator.get_dense_state()?));
560 let task_results = Arc::new(Mutex::new(Vec::new()));
561
562 self.execute_parallel_tasks(&analysis.tasks, shared_state.clone(), task_results, barrier)?;
564
565 let final_state = shared_state
567 .read()
568 .expect("shared state read lock should not be poisoned")
569 .clone();
570 Ok(final_state)
571 }
572
573 pub fn execute_distributed<const N: usize>(
575 &self,
576 circuit: &Circuit<N>,
577 distributed_sim: &mut DistributedQuantumSimulator,
578 ) -> QuantRS2Result<Vec<Complex64>> {
579 let analysis = self.analyze_circuit(circuit)?;
580
581 let distributed_tasks =
583 self.distribute_tasks_across_nodes(&analysis.tasks, distributed_sim)?;
584
585 let results = self.execute_distributed_tasks(&distributed_tasks, distributed_sim)?;
588
589 let final_result = Self::aggregate_distributed_results(results)?;
591
592 Ok(final_result)
593 }
594
595 fn execute_distributed_tasks(
597 &self,
598 distributed_tasks: &[Vec<ParallelTask>],
599 distributed_sim: &DistributedQuantumSimulator,
600 ) -> QuantRS2Result<Vec<Vec<Complex64>>> {
601 use scirs2_core::parallel_ops::{parallel_map, IndexedParallelIterator, ParallelIterator};
602
603 let cluster_status = distributed_sim.get_cluster_status();
604 let num_nodes = cluster_status.len();
605
606 let node_results: Vec<Vec<Complex64>> =
608 parallel_map(&(0..num_nodes).collect::<Vec<_>>(), |&node_id| {
609 let tasks = &distributed_tasks[node_id];
610 let mut node_result = Vec::new();
611
612 for task in tasks {
614 let task_result = Self::execute_task_on_node(task, node_id);
617 node_result.extend(task_result);
618 }
619
620 node_result
621 });
622
623 Ok(node_results)
624 }
625
626 const fn execute_task_on_node(task: &ParallelTask, node_id: usize) -> Vec<Complex64> {
628 Vec::new()
637 }
638
639 fn aggregate_distributed_results(
641 node_results: Vec<Vec<Complex64>>,
642 ) -> QuantRS2Result<Vec<Complex64>> {
643 use scirs2_core::parallel_ops::{IndexedParallelIterator, ParallelIterator};
644
645 let total_elements: usize = node_results.iter().map(std::vec::Vec::len).sum();
648 let mut aggregated = Vec::with_capacity(total_elements);
649
650 for node_result in node_results {
651 aggregated.extend(node_result);
652 }
653
654 Ok(aggregated)
655 }
656
657 fn build_dependency_graph<const N: usize>(
659 &self,
660 circuit: &Circuit<N>,
661 ) -> QuantRS2Result<DependencyGraph> {
662 let gates = circuit.gates();
663 let mut nodes = Vec::with_capacity(gates.len());
664 let mut edges: HashMap<usize, Vec<usize>> = HashMap::new();
665 let mut reverse_edges: HashMap<usize, Vec<usize>> = HashMap::new();
666
667 for (i, gate) in gates.iter().enumerate() {
669 let qubits: HashSet<QubitId> = gate.qubits().into_iter().collect();
670 let cost = Self::estimate_gate_cost(gate.as_ref());
671
672 nodes.push(GateNode {
673 gate_index: i,
674 gate: gate.clone(),
675 qubits,
676 layer: 0, cost,
678 });
679
680 edges.insert(i, Vec::new());
681 reverse_edges.insert(i, Vec::new());
682 }
683
684 for i in 0..nodes.len() {
686 for j in (i + 1)..nodes.len() {
687 if !nodes[i].qubits.is_disjoint(&nodes[j].qubits) {
688 if let Some(edge_list) = edges.get_mut(&i) {
690 edge_list.push(j);
691 }
692 if let Some(reverse_edge_list) = reverse_edges.get_mut(&j) {
693 reverse_edge_list.push(i);
694 }
695 }
696 }
697 }
698
699 let layers = Self::compute_topological_layers(&nodes, &edges)?;
701
702 for (layer_idx, layer) in layers.iter().enumerate() {
704 for &node_idx in layer {
705 if let Some(node) = nodes.get_mut(node_idx) {
706 node.layer = layer_idx;
707 }
708 }
709 }
710
711 Ok(DependencyGraph {
712 nodes,
713 edges,
714 reverse_edges,
715 layers,
716 })
717 }
718
719 fn compute_topological_layers(
721 nodes: &[GateNode],
722 edges: &HashMap<usize, Vec<usize>>,
723 ) -> QuantRS2Result<Vec<Vec<usize>>> {
724 let mut in_degree: HashMap<usize, usize> = HashMap::new();
725 let mut layers = Vec::new();
726 let mut queue = VecDeque::new();
727
728 for i in 0..nodes.len() {
730 in_degree.insert(i, 0);
731 }
732
733 for to_list in edges.values() {
734 for &to in to_list {
735 if let Some(degree) = in_degree.get_mut(&to) {
736 *degree += 1;
737 }
738 }
739 }
740
741 for i in 0..nodes.len() {
743 if in_degree[&i] == 0 {
744 queue.push_back(i);
745 }
746 }
747
748 while !queue.is_empty() {
749 let mut current_layer = Vec::new();
750 let layer_size = queue.len();
751
752 for _ in 0..layer_size {
753 if let Some(node) = queue.pop_front() {
754 current_layer.push(node);
755
756 if let Some(neighbors) = edges.get(&node) {
758 for &neighbor in neighbors {
759 let new_degree = in_degree[&neighbor] - 1;
760 in_degree.insert(neighbor, new_degree);
761
762 if new_degree == 0 {
763 queue.push_back(neighbor);
764 }
765 }
766 }
767 }
768 }
769
770 if !current_layer.is_empty() {
771 layers.push(current_layer);
772 }
773 }
774
775 Ok(layers)
776 }
777
778 fn dependency_based_parallelization(
780 &self,
781 graph: &DependencyGraph,
782 ) -> QuantRS2Result<Vec<ParallelTask>> {
783 let mut tasks = Vec::new();
784
785 for layer in &graph.layers {
786 if layer.len() > 1 {
787 let chunks = self.partition_layer_into_tasks(layer, graph)?;
789
790 for chunk in chunks {
791 let task = self.create_parallel_task(chunk, graph)?;
792 tasks.push(task);
793 }
794 } else {
795 if let Some(&gate_idx) = layer.first() {
797 let task = self.create_parallel_task(vec![gate_idx], graph)?;
798 tasks.push(task);
799 }
800 }
801 }
802
803 Ok(tasks)
804 }
805
806 fn layer_based_parallelization(
808 &self,
809 graph: &DependencyGraph,
810 ) -> QuantRS2Result<Vec<ParallelTask>> {
811 let mut tasks = Vec::new();
812
813 for layer in &graph.layers {
814 let max_gates_per_task = self.config.resource_constraints.max_gates_per_thread;
816
817 for chunk in layer.chunks(max_gates_per_task) {
818 let task = self.create_parallel_task(chunk.to_vec(), graph)?;
819 tasks.push(task);
820 }
821 }
822
823 Ok(tasks)
824 }
825
826 fn qubit_partitioning_parallelization<const N: usize>(
828 &self,
829 circuit: &Circuit<N>,
830 graph: &DependencyGraph,
831 ) -> QuantRS2Result<Vec<ParallelTask>> {
832 let qubit_partitions = self.partition_qubits(circuit)?;
834 let mut tasks = Vec::new();
835
836 for partition in qubit_partitions {
837 let mut partition_gates = Vec::new();
839
840 for (i, node) in graph.nodes.iter().enumerate() {
841 if node.qubits.iter().all(|q| partition.contains(q)) {
842 partition_gates.push(i);
843 }
844 }
845
846 if !partition_gates.is_empty() {
847 let task = self.create_parallel_task(partition_gates, graph)?;
848 tasks.push(task);
849 }
850 }
851
852 Ok(tasks)
853 }
854
855 fn hybrid_parallelization<const N: usize>(
857 &self,
858 circuit: &Circuit<N>,
859 graph: &DependencyGraph,
860 ) -> QuantRS2Result<Vec<ParallelTask>> {
861 let dependency_tasks = self.dependency_based_parallelization(graph)?;
863 let layer_tasks = self.layer_based_parallelization(graph)?;
864 let partition_tasks = self.qubit_partitioning_parallelization(circuit, graph)?;
865
866 let strategies = vec![
868 ("dependency", dependency_tasks),
869 ("layer", layer_tasks),
870 ("partition", partition_tasks),
871 ];
872
873 let best_strategy = strategies.into_iter().max_by(|(_, tasks_a), (_, tasks_b)| {
874 let efficiency_a = Self::calculate_strategy_efficiency(tasks_a);
875 let efficiency_b = Self::calculate_strategy_efficiency(tasks_b);
876 efficiency_a
877 .partial_cmp(&efficiency_b)
878 .unwrap_or(std::cmp::Ordering::Equal)
879 });
880
881 match best_strategy {
882 Some((_, tasks)) => Ok(tasks),
883 None => Ok(Vec::new()),
884 }
885 }
886
887 fn ml_guided_parallelization<const N: usize>(
889 &self,
890 circuit: &Circuit<N>,
891 graph: &DependencyGraph,
892 ) -> QuantRS2Result<Vec<ParallelTask>> {
893 let features = self.extract_ml_features(circuit, graph);
896
897 let predicted_strategy = Self::predict_parallelization_strategy(&features);
899
900 let task_groups = match predicted_strategy {
902 MLPredictedStrategy::HighParallelism => {
903 self.aggressive_parallelization(graph)?
905 }
906 MLPredictedStrategy::BalancedParallelism => {
907 self.hybrid_parallelization(circuit, graph)?
909 }
910 MLPredictedStrategy::ConservativeParallelism => {
911 self.dependency_based_parallelization(graph)?
913 }
914 MLPredictedStrategy::LayerOptimized => {
915 self.layer_based_parallelization(graph)?
917 }
918 };
919
920 let optimized_tasks = self.ml_optimize_tasks(task_groups, &features)?;
922
923 Ok(optimized_tasks)
924 }
925
926 fn extract_ml_features<const N: usize>(
928 &self,
929 circuit: &Circuit<N>,
930 graph: &DependencyGraph,
931 ) -> MLFeatures {
932 let gates = circuit.gates();
933 let num_gates = gates.len();
934 let num_qubits = N;
935
936 let circuit_depth = Self::calculate_circuit_depth(graph);
938
939 let avg_connectivity = Self::calculate_average_connectivity(graph);
941
942 let parallelism_factor = Self::calculate_parallelism_factor(graph);
944
945 let gate_distribution = Self::calculate_gate_distribution(gates);
947
948 let entanglement_score = Self::estimate_entanglement_complexity(circuit);
950
951 MLFeatures {
952 num_gates,
953 num_qubits,
954 circuit_depth,
955 avg_connectivity,
956 parallelism_factor,
957 gate_distribution,
958 entanglement_score,
959 dependency_density: graph.edges.len() as f64 / num_gates as f64,
960 }
961 }
962
963 fn predict_parallelization_strategy(features: &MLFeatures) -> MLPredictedStrategy {
965 if features.parallelism_factor > 0.7 && features.avg_connectivity < 2.0 {
970 return MLPredictedStrategy::HighParallelism;
971 }
972
973 if features.circuit_depth < (features.num_gates as f64 * 0.3) as usize {
975 return MLPredictedStrategy::LayerOptimized;
976 }
977
978 if features.avg_connectivity > 3.5 || features.dependency_density > 0.6 {
980 return MLPredictedStrategy::ConservativeParallelism;
981 }
982
983 MLPredictedStrategy::BalancedParallelism
985 }
986
987 fn ml_optimize_tasks(
989 &self,
990 tasks: Vec<ParallelTask>,
991 features: &MLFeatures,
992 ) -> QuantRS2Result<Vec<ParallelTask>> {
993 let mut optimized = tasks;
995
996 Self::balance_task_loads(&mut optimized)?;
998
999 if features.num_gates < 50 {
1001 optimized = self.merge_small_tasks(optimized)?;
1002 }
1003
1004 if features.parallelism_factor > 0.6 {
1006 optimized = Self::split_large_tasks(optimized)?;
1007 }
1008
1009 Ok(optimized)
1010 }
1011
1012 fn aggressive_parallelization(
1014 &self,
1015 graph: &DependencyGraph,
1016 ) -> QuantRS2Result<Vec<ParallelTask>> {
1017 let mut tasks = Vec::new();
1018 let mut visited = vec![false; graph.nodes.len()];
1019
1020 for (idx, node) in graph.nodes.iter().enumerate() {
1022 if visited[idx] {
1023 continue;
1024 }
1025
1026 let mut parallel_group = vec![idx];
1028 visited[idx] = true;
1029
1030 for (other_idx, other_node) in graph.nodes.iter().enumerate() {
1031 if visited[other_idx] {
1032 continue;
1033 }
1034
1035 if !Self::gates_have_dependency(idx, other_idx, graph)
1037 && !Self::gates_share_qubits(&node.qubits, &other_node.qubits)
1038 {
1039 parallel_group.push(other_idx);
1040 visited[other_idx] = true;
1041 }
1042 }
1043
1044 if !parallel_group.is_empty() {
1045 tasks.push(self.create_parallel_task(parallel_group, graph)?);
1046 }
1047 }
1048
1049 Ok(tasks)
1050 }
1051
1052 fn calculate_circuit_depth(graph: &DependencyGraph) -> usize {
1054 let mut depths = vec![0; graph.nodes.len()];
1055
1056 for (idx, node) in graph.nodes.iter().enumerate() {
1058 let mut max_parent_depth = 0;
1059 if let Some(parents) = graph.reverse_edges.get(&idx) {
1060 for &parent in parents {
1061 max_parent_depth = max_parent_depth.max(depths[parent]);
1062 }
1063 }
1064 depths[idx] = max_parent_depth + 1;
1065 }
1066
1067 *depths.iter().max().unwrap_or(&0)
1068 }
1069
1070 fn calculate_average_connectivity(graph: &DependencyGraph) -> f64 {
1072 if graph.nodes.is_empty() {
1073 return 0.0;
1074 }
1075
1076 let total_connections: usize = graph.nodes.iter().map(|n| n.qubits.len()).sum();
1077 total_connections as f64 / graph.nodes.len() as f64
1078 }
1079
1080 fn calculate_parallelism_factor(graph: &DependencyGraph) -> f64 {
1082 if graph.nodes.is_empty() {
1083 return 0.0;
1084 }
1085
1086 let independent_gates = graph
1088 .nodes
1089 .iter()
1090 .enumerate()
1091 .filter(|(idx, _)| {
1092 graph
1093 .reverse_edges
1094 .get(idx)
1095 .is_none_or(std::vec::Vec::is_empty)
1096 })
1097 .count();
1098
1099 independent_gates as f64 / graph.nodes.len() as f64
1100 }
1101
1102 fn calculate_gate_distribution(
1104 gates: &[Arc<dyn GateOp + Send + Sync>],
1105 ) -> HashMap<String, usize> {
1106 let mut distribution = HashMap::new();
1107
1108 for gate in gates {
1109 let gate_type = format!("{gate:?}"); *distribution.entry(gate_type).or_insert(0) += 1;
1111 }
1112
1113 distribution
1114 }
1115
1116 fn estimate_entanglement_complexity<const N: usize>(circuit: &Circuit<N>) -> f64 {
1118 let gates = circuit.gates();
1119
1120 let two_qubit_gates = gates.iter().filter(|g| g.qubits().len() >= 2).count();
1122
1123 if gates.is_empty() {
1125 0.0
1126 } else {
1127 two_qubit_gates as f64 / gates.len() as f64
1128 }
1129 }
1130
1131 fn balance_task_loads(tasks: &mut [ParallelTask]) -> QuantRS2Result<()> {
1133 tasks.sort_by(|a, b| {
1135 b.cost
1136 .partial_cmp(&a.cost)
1137 .unwrap_or(std::cmp::Ordering::Equal)
1138 });
1139
1140 Ok(())
1142 }
1143
1144 fn merge_small_tasks(&self, tasks: Vec<ParallelTask>) -> QuantRS2Result<Vec<ParallelTask>> {
1146 let mut merged = Vec::new();
1147 let mut current_batch = Vec::new();
1148 let mut current_cost = 0.0;
1149
1150 const COST_THRESHOLD: f64 = 10.0; for task in tasks {
1153 if task.cost < COST_THRESHOLD {
1154 current_batch.push(task);
1155 if let Some(last_task) = current_batch.last() {
1156 current_cost += last_task.cost;
1157 }
1158
1159 if current_cost >= COST_THRESHOLD {
1160 merged.push(Self::merge_task_batch(current_batch)?);
1162 current_batch = Vec::new();
1163 current_cost = 0.0;
1164 }
1165 } else {
1166 merged.push(task);
1167 }
1168 }
1169
1170 if !current_batch.is_empty() {
1172 merged.push(Self::merge_task_batch(current_batch)?);
1173 }
1174
1175 Ok(merged)
1176 }
1177
1178 fn split_large_tasks(tasks: Vec<ParallelTask>) -> QuantRS2Result<Vec<ParallelTask>> {
1180 let mut split_tasks = Vec::new();
1181
1182 const COST_THRESHOLD: f64 = 100.0; for task in tasks {
1185 if task.cost > COST_THRESHOLD && task.gates.len() > 4 {
1186 let mid = task.gates.len() / 2;
1188 let (gates1, gates2) = task.gates.split_at(mid);
1189
1190 split_tasks.push(ParallelTask {
1191 id: Uuid::new_v4(),
1192 gates: gates1.to_vec(),
1193 qubits: task.qubits.clone(),
1194 cost: task.cost / 2.0,
1195 memory_requirement: task.memory_requirement / 2,
1196 dependencies: task.dependencies.clone(),
1197 priority: task.priority,
1198 });
1199
1200 split_tasks.push(ParallelTask {
1201 id: Uuid::new_v4(),
1202 gates: gates2.to_vec(),
1203 qubits: task.qubits.clone(),
1204 cost: task.cost / 2.0,
1205 memory_requirement: task.memory_requirement / 2,
1206 dependencies: HashSet::new(),
1207 priority: task.priority,
1208 });
1209 } else {
1210 split_tasks.push(task);
1211 }
1212 }
1213
1214 Ok(split_tasks)
1215 }
1216
1217 fn merge_task_batch(batch: Vec<ParallelTask>) -> QuantRS2Result<ParallelTask> {
1219 let mut merged_gates = Vec::new();
1220 let mut merged_qubits = HashSet::new();
1221 let mut merged_cost = 0.0;
1222 let mut merged_memory = 0;
1223 let mut merged_deps = HashSet::new();
1224 let mut max_priority = TaskPriority::Low;
1225
1226 for task in batch {
1227 merged_gates.extend(task.gates);
1228 merged_qubits.extend(task.qubits);
1229 merged_cost += task.cost;
1230 merged_memory += task.memory_requirement;
1231 merged_deps.extend(task.dependencies);
1232 if task.priority as u8 > max_priority as u8 {
1234 max_priority = task.priority;
1235 }
1236 }
1237
1238 Ok(ParallelTask {
1239 id: Uuid::new_v4(),
1240 gates: merged_gates,
1241 qubits: merged_qubits,
1242 cost: merged_cost,
1243 memory_requirement: merged_memory,
1244 dependencies: merged_deps,
1245 priority: max_priority,
1246 })
1247 }
1248
1249 fn gates_have_dependency(idx1: usize, idx2: usize, graph: &DependencyGraph) -> bool {
1251 if let Some(deps) = graph.reverse_edges.get(&idx2) {
1253 if deps.contains(&idx1) {
1254 return true;
1255 }
1256 }
1257
1258 if let Some(deps) = graph.reverse_edges.get(&idx1) {
1260 if deps.contains(&idx2) {
1261 return true;
1262 }
1263 }
1264
1265 false
1266 }
1267
1268 fn gates_share_qubits(qubits1: &HashSet<QubitId>, qubits2: &HashSet<QubitId>) -> bool {
1270 !qubits1.is_disjoint(qubits2)
1271 }
1272
1273 fn hardware_aware_parallelization<const N: usize>(
1275 &self,
1276 circuit: &Circuit<N>,
1277 graph: &DependencyGraph,
1278 ) -> QuantRS2Result<Vec<ParallelTask>> {
1279 let hw_char = Self::detect_hardware_characteristics();
1282
1283 let tasks = match self.select_hardware_strategy(&hw_char, circuit, graph)? {
1285 HardwareStrategy::CacheOptimized => {
1286 self.cache_optimized_parallelization(graph, &hw_char)?
1287 }
1288 HardwareStrategy::SIMDOptimized => {
1289 self.simd_optimized_parallelization(graph, &hw_char)?
1290 }
1291 HardwareStrategy::NUMAAware => self.numa_aware_parallelization(graph, &hw_char)?,
1292 HardwareStrategy::GPUOffload => {
1293 self.dependency_based_parallelization(graph)?
1295 }
1296 HardwareStrategy::Hybrid => self.hybrid_hardware_parallelization(graph, &hw_char)?,
1297 };
1298
1299 let optimized_tasks = Self::optimize_hardware_affinity(tasks, &hw_char)?;
1301
1302 Ok(optimized_tasks)
1303 }
1304
1305 fn detect_hardware_characteristics() -> HardwareCharacteristics {
1307 use scirs2_core::parallel_ops::current_num_threads;
1308
1309 let num_cores = current_num_threads();
1311
1312 let l1_cache_size = 32 * 1024; let l2_cache_size = 256 * 1024; let l3_cache_size = 8 * 1024 * 1024; let memory_bandwidth = 50.0; let num_numa_nodes = if num_cores > 32 { 2 } else { 1 };
1322
1323 let has_gpu = false; #[cfg(target_arch = "x86_64")]
1328 let simd_width = 256; #[cfg(not(target_arch = "x86_64"))]
1330 let simd_width = 128; HardwareCharacteristics {
1333 num_cores,
1334 l1_cache_size,
1335 l2_cache_size,
1336 l3_cache_size,
1337 memory_bandwidth,
1338 num_numa_nodes,
1339 has_gpu,
1340 simd_width,
1341 }
1342 }
1343
1344 fn select_hardware_strategy<const N: usize>(
1346 &self,
1347 hw_char: &HardwareCharacteristics,
1348 circuit: &Circuit<N>,
1349 graph: &DependencyGraph,
1350 ) -> QuantRS2Result<HardwareStrategy> {
1351 let gates = circuit.gates();
1352 let num_gates = gates.len();
1353
1354 if hw_char.has_gpu && num_gates > 1000 {
1356 return Ok(HardwareStrategy::GPUOffload);
1357 }
1358
1359 if hw_char.num_numa_nodes > 1 && N > 20 {
1361 return Ok(HardwareStrategy::NUMAAware);
1362 }
1363
1364 let has_many_rotation_gates = self.count_rotation_gates(gates) > num_gates / 2;
1366 if has_many_rotation_gates && hw_char.simd_width >= 256 {
1367 return Ok(HardwareStrategy::SIMDOptimized);
1368 }
1369
1370 if num_gates < 500 && N < 15 {
1372 return Ok(HardwareStrategy::CacheOptimized);
1373 }
1374
1375 Ok(HardwareStrategy::Hybrid)
1377 }
1378
1379 fn cache_optimized_parallelization(
1381 &self,
1382 graph: &DependencyGraph,
1383 hw_char: &HardwareCharacteristics,
1384 ) -> QuantRS2Result<Vec<ParallelTask>> {
1385 let max_task_size = hw_char.l2_cache_size / (16 * 2); let mut tasks = Vec::new();
1389 let mut current_group = Vec::new();
1390 let mut current_size = 0;
1391
1392 for (idx, node) in graph.nodes.iter().enumerate() {
1393 let gate_size = (1 << node.qubits.len()) * 16; if current_size + gate_size > max_task_size && !current_group.is_empty() {
1396 tasks.push(self.create_parallel_task(current_group, graph)?);
1397 current_group = Vec::new();
1398 current_size = 0;
1399 }
1400
1401 current_group.push(idx);
1402 current_size += gate_size;
1403 }
1404
1405 if !current_group.is_empty() {
1406 tasks.push(self.create_parallel_task(current_group, graph)?);
1407 }
1408
1409 Ok(tasks)
1410 }
1411
1412 fn simd_optimized_parallelization(
1414 &self,
1415 graph: &DependencyGraph,
1416 hw_char: &HardwareCharacteristics,
1417 ) -> QuantRS2Result<Vec<ParallelTask>> {
1418 let mut rotation_gates = Vec::new();
1420 let mut other_gates = Vec::new();
1421
1422 for (idx, node) in graph.nodes.iter().enumerate() {
1423 if Self::is_rotation_gate(node.gate.as_ref()) {
1424 rotation_gates.push(idx);
1425 } else {
1426 other_gates.push(idx);
1427 }
1428 }
1429
1430 let mut tasks = Vec::new();
1431
1432 let vec_width = hw_char.simd_width / 128; for chunk in rotation_gates.chunks(vec_width) {
1435 tasks.push(self.create_parallel_task(chunk.to_vec(), graph)?);
1436 }
1437
1438 for idx in other_gates {
1440 tasks.push(self.create_parallel_task(vec![idx], graph)?);
1441 }
1442
1443 Ok(tasks)
1444 }
1445
1446 fn numa_aware_parallelization(
1448 &self,
1449 graph: &DependencyGraph,
1450 hw_char: &HardwareCharacteristics,
1451 ) -> QuantRS2Result<Vec<ParallelTask>> {
1452 let num_nodes = hw_char.num_numa_nodes;
1454 let mut node_tasks: Vec<Vec<usize>> = vec![Vec::new(); num_nodes];
1455
1456 for (idx, node) in graph.nodes.iter().enumerate() {
1458 let numa_node = Self::select_numa_node(node, num_nodes);
1459 node_tasks[numa_node].push(idx);
1460 }
1461
1462 let mut tasks = Vec::new();
1463 for node_task_indices in node_tasks {
1464 if !node_task_indices.is_empty() {
1465 tasks.push(self.create_parallel_task(node_task_indices, graph)?);
1466 }
1467 }
1468
1469 Ok(tasks)
1470 }
1471
1472 fn hybrid_hardware_parallelization(
1474 &self,
1475 graph: &DependencyGraph,
1476 hw_char: &HardwareCharacteristics,
1477 ) -> QuantRS2Result<Vec<ParallelTask>> {
1478 let base_tasks = self.dependency_based_parallelization(graph)?;
1481
1482 let cache_aware_tasks = Self::refine_for_cache(base_tasks, hw_char)?;
1484
1485 if hw_char.num_numa_nodes > 1 {
1487 Self::refine_for_numa(cache_aware_tasks, hw_char)
1488 } else {
1489 Ok(cache_aware_tasks)
1490 }
1491 }
1492
1493 const fn optimize_hardware_affinity(
1495 tasks: Vec<ParallelTask>,
1496 hw_char: &HardwareCharacteristics,
1497 ) -> QuantRS2Result<Vec<ParallelTask>> {
1498 Ok(tasks)
1502 }
1503
1504 fn count_rotation_gates(&self, gates: &[Arc<dyn GateOp + Send + Sync>]) -> usize {
1506 gates
1507 .iter()
1508 .filter(|g| Self::is_rotation_gate(g.as_ref()))
1509 .count()
1510 }
1511
1512 fn is_rotation_gate(gate: &dyn GateOp) -> bool {
1514 let gate_str = format!("{gate:?}");
1515 gate_str.contains("RX") || gate_str.contains("RY") || gate_str.contains("RZ")
1516 }
1517
1518 fn select_numa_node(node: &GateNode, num_nodes: usize) -> usize {
1520 let qubit_sum: usize = node.qubits.iter().map(|q| q.0 as usize).sum();
1522 qubit_sum % num_nodes
1523 }
1524
1525 fn refine_for_cache(
1527 tasks: Vec<ParallelTask>,
1528 hw_char: &HardwareCharacteristics,
1529 ) -> QuantRS2Result<Vec<ParallelTask>> {
1530 let max_cache_size = hw_char.l2_cache_size;
1532 let mut refined = Vec::new();
1533
1534 for task in tasks {
1535 if task.memory_requirement > max_cache_size {
1536 let mid = task.gates.len() / 2;
1538 let (gates1, gates2) = task.gates.split_at(mid);
1539
1540 refined.push(ParallelTask {
1541 id: Uuid::new_v4(),
1542 gates: gates1.to_vec(),
1543 qubits: task.qubits.clone(),
1544 cost: task.cost / 2.0,
1545 memory_requirement: task.memory_requirement / 2,
1546 dependencies: task.dependencies.clone(),
1547 priority: task.priority,
1548 });
1549
1550 refined.push(ParallelTask {
1551 id: Uuid::new_v4(),
1552 gates: gates2.to_vec(),
1553 qubits: task.qubits,
1554 cost: task.cost / 2.0,
1555 memory_requirement: task.memory_requirement / 2,
1556 dependencies: HashSet::new(),
1557 priority: task.priority,
1558 });
1559 } else {
1560 refined.push(task);
1561 }
1562 }
1563
1564 Ok(refined)
1565 }
1566
1567 const fn refine_for_numa(
1569 tasks: Vec<ParallelTask>,
1570 hw_char: &HardwareCharacteristics,
1571 ) -> QuantRS2Result<Vec<ParallelTask>> {
1572 Ok(tasks)
1576 }
1577
1578 fn create_parallel_task(
1580 &self,
1581 gate_indices: Vec<usize>,
1582 graph: &DependencyGraph,
1583 ) -> QuantRS2Result<ParallelTask> {
1584 let mut gates = Vec::new();
1585 let mut qubits = HashSet::new();
1586 let mut total_cost = 0.0;
1587 let mut memory_requirement = 0;
1588
1589 for &idx in &gate_indices {
1590 if let Some(node) = graph.nodes.get(idx) {
1591 gates.push(node.gate.clone());
1592 qubits.extend(&node.qubits);
1593 total_cost += node.cost;
1594 memory_requirement += Self::estimate_gate_memory(node.gate.as_ref());
1595 }
1596 }
1597
1598 let dependencies = self.calculate_task_dependencies(&gate_indices, graph)?;
1600
1601 Ok(ParallelTask {
1602 id: Uuid::new_v4(),
1603 gates,
1604 qubits,
1605 cost: total_cost,
1606 memory_requirement,
1607 dependencies,
1608 priority: TaskPriority::Normal,
1609 })
1610 }
1611
1612 fn calculate_task_dependencies(
1614 &self,
1615 gate_indices: &[usize],
1616 graph: &DependencyGraph,
1617 ) -> QuantRS2Result<HashSet<Uuid>> {
1618 let mut dependencies = HashSet::new();
1620
1621 for &gate_idx in gate_indices {
1623 if let Some(parent_indices) = graph.reverse_edges.get(&gate_idx) {
1624 for &parent_idx in parent_indices {
1626 if !gate_indices.contains(&parent_idx) {
1629 let dep_uuid = Self::generate_gate_dependency_uuid(parent_idx);
1637 dependencies.insert(dep_uuid);
1638 }
1639 }
1640 }
1641 }
1642
1643 Ok(dependencies)
1644 }
1645
1646 fn generate_gate_dependency_uuid(gate_index: usize) -> Uuid {
1648 let namespace =
1651 Uuid::parse_str("6ba7b810-9dad-11d1-80b4-00c04fd430c8").unwrap_or_else(|_| Uuid::nil());
1652
1653 let mut bytes = [0u8; 16];
1655 let index_bytes = gate_index.to_le_bytes();
1656 bytes[0..8].copy_from_slice(&index_bytes);
1657
1658 Uuid::from_bytes(bytes)
1659 }
1660
1661 fn estimate_gate_cost(gate: &dyn GateOp) -> f64 {
1663 match gate.num_qubits() {
1664 1 => 1.0,
1665 2 => 4.0,
1666 3 => 8.0,
1667 n => (2.0_f64).powi(n as i32),
1668 }
1669 }
1670
1671 fn estimate_gate_memory(gate: &dyn GateOp) -> usize {
1673 let num_qubits = gate.num_qubits();
1674 let state_size = 1 << num_qubits;
1675 state_size * std::mem::size_of::<Complex64>()
1676 }
1677
1678 fn partition_layer_into_tasks(
1680 &self,
1681 layer: &[usize],
1682 graph: &DependencyGraph,
1683 ) -> QuantRS2Result<Vec<Vec<usize>>> {
1684 let max_gates_per_task = self.config.resource_constraints.max_gates_per_thread;
1685 let mut chunks = Vec::new();
1686
1687 for chunk in layer.chunks(max_gates_per_task) {
1688 chunks.push(chunk.to_vec());
1689 }
1690
1691 Ok(chunks)
1692 }
1693
1694 fn partition_qubits<const N: usize>(
1696 &self,
1697 circuit: &Circuit<N>,
1698 ) -> QuantRS2Result<Vec<HashSet<QubitId>>> {
1699 let mut partitions = Vec::new();
1701 let mut used_qubits = HashSet::new();
1702
1703 for i in 0..N {
1704 let qubit = QubitId::new(i as u32);
1705 if used_qubits.insert(qubit) {
1706 let mut partition = HashSet::new();
1707 partition.insert(qubit);
1708 partitions.push(partition);
1709 }
1710 }
1711
1712 Ok(partitions)
1713 }
1714
1715 fn calculate_strategy_efficiency(tasks: &[ParallelTask]) -> f64 {
1717 if tasks.is_empty() {
1718 return 0.0;
1719 }
1720
1721 let total_cost: f64 = tasks.iter().map(|t| t.cost).sum();
1722 let max_cost = tasks.iter().map(|t| t.cost).fold(0.0, f64::max);
1723
1724 if max_cost > 0.0 {
1725 total_cost / (max_cost * tasks.len() as f64)
1726 } else {
1727 0.0
1728 }
1729 }
1730
1731 fn calculate_parallelization_metrics<const N: usize>(
1733 &self,
1734 circuit: &Circuit<N>,
1735 graph: &DependencyGraph,
1736 tasks: Vec<ParallelTask>,
1737 ) -> QuantRS2Result<ParallelizationAnalysis> {
1738 let num_layers = graph.layers.len();
1739 let max_parallelism = graph
1740 .layers
1741 .iter()
1742 .map(std::vec::Vec::len)
1743 .max()
1744 .unwrap_or(1);
1745 let critical_path_length = graph.layers.len();
1746
1747 let efficiency = if circuit.num_gates() > 0 {
1748 max_parallelism as f64 / circuit.num_gates() as f64
1749 } else {
1750 0.0
1751 };
1752
1753 let resource_utilization = ResourceUtilization {
1754 cpu_utilization: vec![0.8; self.config.max_threads],
1755 memory_usage: vec![
1756 self.config.memory_budget / self.config.max_threads;
1757 self.config.max_threads
1758 ],
1759 load_balance_score: 0.85,
1760 communication_overhead: 0.1,
1761 };
1762
1763 let recommendations = self.generate_optimization_recommendations(circuit, graph, &tasks);
1764
1765 Ok(ParallelizationAnalysis {
1766 tasks,
1767 num_layers,
1768 efficiency,
1769 max_parallelism,
1770 critical_path_length,
1771 resource_utilization,
1772 recommendations,
1773 })
1774 }
1775
1776 fn generate_optimization_recommendations<const N: usize>(
1778 &self,
1779 circuit: &Circuit<N>,
1780 graph: &DependencyGraph,
1781 tasks: &[ParallelTask],
1782 ) -> Vec<OptimizationRecommendation> {
1783 let mut recommendations = Vec::new();
1784
1785 if graph.layers.iter().any(|layer| layer.len() == 1) {
1787 recommendations.push(OptimizationRecommendation {
1788 recommendation_type: RecommendationType::GateReordering,
1789 description: "Consider reordering gates to create larger parallel layers"
1790 .to_string(),
1791 expected_improvement: 0.2,
1792 complexity: RecommendationComplexity::Medium,
1793 });
1794 }
1795
1796 let task_costs: Vec<f64> = tasks.iter().map(|t| t.cost).collect();
1798 let cost_variance = Self::calculate_variance(&task_costs);
1799 if cost_variance > 0.5 {
1800 recommendations.push(OptimizationRecommendation {
1801 recommendation_type: RecommendationType::ResourceAllocation,
1802 description: "Task costs are unbalanced, consider load balancing optimization"
1803 .to_string(),
1804 expected_improvement: 0.15,
1805 complexity: RecommendationComplexity::Low,
1806 });
1807 }
1808
1809 recommendations
1810 }
1811
1812 fn calculate_variance(values: &[f64]) -> f64 {
1814 if values.is_empty() {
1815 return 0.0;
1816 }
1817
1818 let mean: f64 = values.iter().sum::<f64>() / values.len() as f64;
1819 let variance: f64 =
1820 values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
1821 variance
1822 }
1823
1824 fn execute_sequential<const N: usize>(
1826 circuit: &Circuit<N>,
1827 simulator: &LargeScaleQuantumSimulator,
1828 ) -> QuantRS2Result<Vec<Complex64>> {
1829 let result = simulator.run(circuit)?;
1831 Ok(Vec::new())
1834 }
1835
1836 fn execute_parallel_tasks(
1838 &self,
1839 tasks: &[ParallelTask],
1840 shared_state: Arc<RwLock<Vec<Complex64>>>,
1841 results: Arc<Mutex<Vec<Complex64>>>,
1842 barrier: Arc<Barrier>,
1843 ) -> QuantRS2Result<()> {
1844 use scirs2_core::parallel_ops::{parallel_map, IndexedParallelIterator};
1845
1846 let _ = parallel_map(tasks, |task| {
1849 barrier.wait();
1851
1852 let mut state = shared_state
1854 .write()
1855 .expect("Failed to acquire write lock on shared state");
1856
1857 for gate in &task.gates {
1859 let qubits = gate.qubits();
1861
1862 match qubits.len() {
1864 1 => {
1865 Self::apply_single_qubit_gate_to_state(
1867 &mut state,
1868 gate.as_ref(),
1869 qubits[0].0 as usize,
1870 );
1871 }
1872 2 => {
1873 Self::apply_two_qubit_gate_to_state(
1875 &mut state,
1876 gate.as_ref(),
1877 qubits[0].0 as usize,
1878 qubits[1].0 as usize,
1879 );
1880 }
1881 _ => {
1882 eprintln!(
1884 "Warning: {}-qubit gates not optimized for parallel execution",
1885 qubits.len()
1886 );
1887 }
1888 }
1889 }
1890
1891 barrier.wait();
1893 });
1894
1895 let final_state = shared_state
1897 .read()
1898 .expect("Failed to acquire read lock on shared state");
1899 let mut result_vec = results.lock().expect("Failed to acquire lock on results");
1900 result_vec.clone_from(&final_state);
1901
1902 Ok(())
1903 }
1904
1905 fn apply_single_qubit_gate_to_state(state: &mut [Complex64], gate: &dyn GateOp, qubit: usize) {
1907 let num_qubits = (state.len() as f64).log2() as usize;
1912 let stride = 1 << qubit;
1913
1914 for base in 0..state.len() {
1916 if (base & stride) == 0 {
1917 let idx0 = base;
1919 let idx1 = base | stride;
1920
1921 let amp0 = state[idx0];
1924 let amp1 = state[idx1];
1925
1926 state[idx0] = amp0;
1928 state[idx1] = amp1;
1929 }
1930 }
1931 }
1932
1933 fn apply_two_qubit_gate_to_state(
1935 state: &mut [Complex64],
1936 gate: &dyn GateOp,
1937 qubit1: usize,
1938 qubit2: usize,
1939 ) {
1940 let num_qubits = (state.len() as f64).log2() as usize;
1944 let stride1 = 1 << qubit1;
1945 let stride2 = 1 << qubit2;
1946
1947 for base in 0..state.len() {
1949 if (base & stride1) == 0 && (base & stride2) == 0 {
1950 let idx00 = base;
1951 let idx01 = base | stride1;
1952 let idx10 = base | stride2;
1953 let idx11 = base | stride1 | stride2;
1954
1955 let amp00 = state[idx00];
1957 let amp01 = state[idx01];
1958 let amp10 = state[idx10];
1959 let amp11 = state[idx11];
1960
1961 state[idx00] = amp00;
1963 state[idx01] = amp01;
1964 state[idx10] = amp10;
1965 state[idx11] = amp11;
1966 }
1967 }
1968 }
1969
1970 fn distribute_tasks_across_nodes(
1972 &self,
1973 tasks: &[ParallelTask],
1974 distributed_sim: &DistributedQuantumSimulator,
1975 ) -> QuantRS2Result<Vec<Vec<ParallelTask>>> {
1976 let cluster_status = distributed_sim.get_cluster_status();
1978 let num_nodes = cluster_status.len();
1979
1980 if num_nodes == 0 {
1981 return Ok(vec![tasks.to_vec()]);
1982 }
1983
1984 let node_capacities = Self::analyze_node_capabilities(&cluster_status);
1986
1987 let mut sorted_tasks: Vec<_> = tasks.to_vec();
1989 sorted_tasks.sort_by(|a, b| {
1990 b.cost
1991 .partial_cmp(&a.cost)
1992 .unwrap_or(std::cmp::Ordering::Equal)
1993 });
1994
1995 let mut distributed_tasks = vec![Vec::new(); num_nodes];
1997 let mut node_loads = vec![0.0; num_nodes];
1998
1999 for task in sorted_tasks {
2000 let best_node = Self::select_best_node_for_task(&task, &node_capacities, &node_loads);
2002
2003 distributed_tasks[best_node].push(task.clone());
2005 node_loads[best_node] += task.cost;
2006 }
2007
2008 Self::rebalance_node_distribution(
2010 &mut distributed_tasks,
2011 &node_capacities,
2012 &mut node_loads,
2013 )?;
2014
2015 Ok(distributed_tasks)
2016 }
2017
2018 fn analyze_node_capabilities(
2020 cluster_status: &HashMap<Uuid, crate::distributed_simulator::NodeInfo>,
2021 ) -> Vec<NodeCapacity> {
2022 cluster_status
2023 .values()
2024 .map(|info| {
2025 NodeCapacity {
2028 cpu_cores: 4, memory_gb: 16.0, gpu_available: false, network_bandwidth_gbps: 10.0, relative_performance: 1.0, }
2034 })
2035 .collect()
2036 }
2037
2038 fn select_best_node_for_task(
2040 task: &ParallelTask,
2041 node_capacities: &[NodeCapacity],
2042 node_loads: &[f64],
2043 ) -> usize {
2044 let mut best_node = 0;
2045 let mut best_score = f64::NEG_INFINITY;
2046
2047 for (idx, capacity) in node_capacities.iter().enumerate() {
2048 let load_factor = 1.0 - (node_loads[idx] / capacity.relative_performance).min(1.0);
2054 let memory_factor = if task.memory_requirement
2055 < (capacity.memory_gb * 1024.0 * 1024.0 * 1024.0) as usize
2056 {
2057 1.0
2058 } else {
2059 0.5 };
2061
2062 let score = load_factor * capacity.relative_performance * memory_factor;
2063
2064 if score > best_score {
2065 best_score = score;
2066 best_node = idx;
2067 }
2068 }
2069
2070 best_node
2071 }
2072
2073 fn rebalance_node_distribution(
2075 distributed_tasks: &mut [Vec<ParallelTask>],
2076 node_capacities: &[NodeCapacity],
2077 node_loads: &mut [f64],
2078 ) -> QuantRS2Result<()> {
2079 let total_load: f64 = node_loads.iter().sum();
2081 let avg_load = total_load / node_loads.len() as f64;
2082
2083 const IMBALANCE_THRESHOLD: f64 = 0.3; for _ in 0..5 {
2087 let mut rebalanced = false;
2089
2090 let heavy_nodes: Vec<usize> = node_loads
2092 .iter()
2093 .enumerate()
2094 .filter(|(_, load)| **load > avg_load * (1.0 + IMBALANCE_THRESHOLD))
2095 .map(|(idx, _)| idx)
2096 .collect();
2097
2098 let light_nodes: Vec<usize> = node_loads
2099 .iter()
2100 .enumerate()
2101 .filter(|(_, load)| **load < avg_load * (1.0 - IMBALANCE_THRESHOLD))
2102 .map(|(idx, _)| idx)
2103 .collect();
2104
2105 for &heavy_idx in &heavy_nodes {
2106 for &light_idx in &light_nodes {
2107 if heavy_idx != light_idx {
2108 if let Some(task) = distributed_tasks[heavy_idx].pop() {
2110 node_loads[heavy_idx] -= task.cost;
2111 distributed_tasks[light_idx].push(task.clone());
2112 node_loads[light_idx] += task.cost;
2113 rebalanced = true;
2114 break;
2115 }
2116 }
2117 }
2118 if rebalanced {
2119 break;
2120 }
2121 }
2122
2123 if !rebalanced {
2124 break; }
2126 }
2127
2128 Ok(())
2129 }
2130
2131 fn compute_circuit_hash<const N: usize>(circuit: &Circuit<N>) -> u64 {
2133 use std::collections::hash_map::DefaultHasher;
2134 use std::hash::{Hash, Hasher};
2135
2136 let mut hasher = DefaultHasher::new();
2137
2138 circuit.num_gates().hash(&mut hasher);
2140 circuit.num_qubits().hash(&mut hasher);
2141
2142 for gate in circuit.gates() {
2144 gate.name().hash(&mut hasher);
2145 gate.qubits().len().hash(&mut hasher);
2146 }
2147
2148 hasher.finish()
2149 }
2150
2151 fn update_performance_stats(
2153 &self,
2154 execution_time: Duration,
2155 analysis: &ParallelizationAnalysis,
2156 ) {
2157 let mut stats = self
2158 .performance_stats
2159 .lock()
2160 .expect("performance stats mutex should not be poisoned");
2161 stats.circuits_processed += 1;
2162 stats.total_execution_time += execution_time;
2163 stats.average_efficiency = stats
2164 .average_efficiency
2165 .mul_add((stats.circuits_processed - 1) as f64, analysis.efficiency)
2166 / stats.circuits_processed as f64;
2167 }
2168}
2169
2170impl LoadBalancer {
2171 #[must_use]
2173 pub fn new(num_threads: usize) -> Self {
2174 Self {
2175 thread_loads: vec![0.0; num_threads],
2176 task_queues: vec![VecDeque::new(); num_threads],
2177 work_stealing_stats: WorkStealingStats::default(),
2178 }
2179 }
2180
2181 pub fn balance_load(&mut self, tasks: Vec<ParallelTask>) -> Vec<Vec<ParallelTask>> {
2183 let mut balanced_tasks = vec![Vec::new(); self.thread_loads.len()];
2184
2185 for (i, task) in tasks.into_iter().enumerate() {
2187 let thread_index = i % self.thread_loads.len();
2188 balanced_tasks[thread_index].push(task);
2189 }
2190
2191 balanced_tasks
2192 }
2193}
2194
2195pub fn benchmark_automatic_parallelization<const N: usize>(
2197 circuits: Vec<Circuit<N>>,
2198 config: AutoParallelConfig,
2199) -> QuantRS2Result<AutoParallelBenchmarkResults> {
2200 let engine = AutoParallelEngine::new(config);
2201 let mut results = Vec::new();
2202 let start_time = Instant::now();
2203
2204 for circuit in circuits {
2205 let analysis_start = Instant::now();
2206 let analysis = engine.analyze_circuit(&circuit)?;
2207 let analysis_time = analysis_start.elapsed();
2208
2209 results.push(CircuitParallelResult {
2210 circuit_size: circuit.num_gates(),
2211 num_qubits: circuit.num_qubits(),
2212 analysis_time,
2213 efficiency: analysis.efficiency,
2214 max_parallelism: analysis.max_parallelism,
2215 num_tasks: analysis.tasks.len(),
2216 });
2217 }
2218
2219 let total_time = start_time.elapsed();
2220
2221 Ok(AutoParallelBenchmarkResults {
2222 total_time,
2223 average_efficiency: results.iter().map(|r| r.efficiency).sum::<f64>()
2224 / results.len() as f64,
2225 average_parallelism: results.iter().map(|r| r.max_parallelism).sum::<usize>()
2226 / results.len(),
2227 circuit_results: results,
2228 })
2229}
2230
2231#[derive(Debug, Clone)]
2233pub struct AutoParallelBenchmarkResults {
2234 pub total_time: Duration,
2236 pub circuit_results: Vec<CircuitParallelResult>,
2238 pub average_efficiency: f64,
2240 pub average_parallelism: usize,
2242}
2243
2244#[derive(Debug, Clone)]
2246pub struct CircuitParallelResult {
2247 pub circuit_size: usize,
2249 pub num_qubits: usize,
2251 pub analysis_time: Duration,
2253 pub efficiency: f64,
2255 pub max_parallelism: usize,
2257 pub num_tasks: usize,
2259}