fusion-blossom 0.2.2

A fast minimum-weight perfect matching solver for quantum error correction
Documentation
//! Serial Dual Parallel
//! 
//! A parallel implementation of the dual module, leveraging the serial version
//! 
//! While it assumes single machine (using async runtime of Rust), the design targets distributed version
//! that can spawn on different machines efficiently. The distributed version can be build based on this 
//! parallel version.
//! 
//! Notes:
//! 
//! According to https://tokio.rs/tokio/tutorial, tokio is not good for parallel computation. It suggests
//! using https://docs.rs/rayon/latest/rayon/. 
//!

use super::util::*;
use std::sync::{Arc, Weak};
use super::dual_module::*;
use super::dual_module_serial::*;
use crate::serde_json;
use serde::{Serialize, Deserialize};
use super::visualize::*;
use crate::rayon::prelude::*;
use std::collections::{BTreeSet, HashSet};
use super::complete_graph::CompleteGraph;
use crate::weak_table::PtrWeakHashSet;
use super::pointers::*;


pub struct DualModuleParallel<SerialModule: DualModuleImpl + Send + Sync> {
    /// the basic wrapped serial modules at the beginning, afterwards the fused units are appended after them
    pub units: Vec<ArcManualSafeLock<DualModuleParallelUnit<SerialModule>>>,
    /// local configuration
    pub config: DualModuleParallelConfig,
    /// partition information generated by the config
    pub partition_info: Arc<PartitionInfo>,
    /// thread pool used to execute async functions in parallel
    pub thread_pool: Arc<rayon::ThreadPool>,
    /// an empty sync requests queue just to implement the trait
    pub empty_sync_request: Vec<SyncRequest>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct DualModuleParallelConfig {
    /// enable async execution of dual operations; only used when calling top-level operations, not used in individual units
    #[serde(default = "dual_module_parallel_default_configs::thread_pool_size")]
    pub thread_pool_size: usize,
    /// strategy of edges placement: if edges are placed in the fusion unit, it's good for software implementation because there are no duplicate
    /// edges and no unnecessary vertices in the descendant units. On the other hand, it's not very favorable if implemented on hardware: the 
    /// fusion unit usually contains a very small amount of vertices and edges for the interfacing between two blocks, but maintaining this small graph
    /// may consume additional hardware resources and increase the decoding latency. I want the algorithm to finally work on the hardware efficiently
    /// so I need to verify that it does work by holding all the fusion unit's owned vertices and edges in the descendants, although usually duplicated.
    #[serde(default = "dual_module_parallel_default_configs::edges_in_fusion_unit")]
    pub edges_in_fusion_unit: bool,
    /// enable parallel execution of a fused dual module
    #[serde(default = "dual_module_parallel_default_configs::enable_parallel_execution")]
    pub enable_parallel_execution: bool,
}

impl Default for DualModuleParallelConfig {
    fn default() -> Self { serde_json::from_value(json!({})).unwrap() }
}

pub mod dual_module_parallel_default_configs {
    pub fn thread_pool_size() -> usize { 0 }  // by default to the number of CPU cores
    // pub fn thread_pool_size() -> usize { 1 }  // debug: use a single core
    pub fn edges_in_fusion_unit() -> bool { true }  // by default use the software-friendly approach because of removing duplicate edges
    pub fn enable_parallel_execution() -> bool { false }  // by default disabled: parallel execution may cause too much context switch, yet not much speed benefit
}

pub struct DualModuleParallelUnit<SerialModule: DualModuleImpl + Send + Sync> {
    /// the index
    pub unit_index: usize,
    /// partition information generated by the config
    pub partition_info: Arc<PartitionInfo>,
    /// information shared with serial module
    pub partition_unit: PartitionUnitPtr,
    /// whether it's active or not; some units are "placeholder" units that are not active until they actually fuse their children
    pub is_active: bool,
    /// the vertex range of this parallel unit consists of all the owning_range of its descendants
    pub whole_range: VertexRange,
    /// the vertices owned by this unit, note that owning_range is a subset of whole_range
    pub owning_range: VertexRange,
    /// the vertices that are mirrored outside of whole_range, in order to propagate a vertex's sync event to every unit that mirrors it
    pub extra_descendant_mirrored_vertices: HashSet<VertexIndex>,
    /// the owned serial dual module
    pub serial_module: SerialModule,
    /// left and right children dual modules
    pub children: Option<(DualModuleParallelUnitWeak<SerialModule>, DualModuleParallelUnitWeak<SerialModule>)>,
    /// parent dual module
    pub parent: Option<DualModuleParallelUnitWeak<SerialModule>>,
    /// elevated dual nodes: whose descendent not on the representative path of a dual node
    pub elevated_dual_nodes: PtrWeakHashSet<DualNodeWeak>,
    /// an empty sync requests queue just to implement the trait
    pub empty_sync_request: Vec<SyncRequest>,
    /// run things in thread pool
    pub enable_parallel_execution: bool,
    /// whether any descendant unit has active dual node
    pub has_active_node: bool,
}

pub type DualModuleParallelUnitPtr<SerialModule> = ArcManualSafeLock<DualModuleParallelUnit<SerialModule>>;
pub type DualModuleParallelUnitWeak<SerialModule> = WeakManualSafeLock<DualModuleParallelUnit<SerialModule>>;

impl<SerialModule: DualModuleImpl + Send + Sync> std::fmt::Debug for DualModuleParallelUnitPtr<SerialModule> {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        let unit = self.read_recursive();
        write!(f, "{}", unit.unit_index)
    }
}

impl<SerialModule: DualModuleImpl + Send + Sync> std::fmt::Debug for DualModuleParallelUnitWeak<SerialModule> {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        self.upgrade_force().fmt(f)
    }
}


impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleParallel<SerialModule> {

    /// recommended way to create a new instance, given a customized configuration
    pub fn new_config(initializer: &SolverInitializer, partition_info: &PartitionInfo, config: DualModuleParallelConfig) -> Self {
        let partition_info = Arc::new(partition_info.clone());
        let mut thread_pool_builder = rayon::ThreadPoolBuilder::new();
        if config.thread_pool_size != 0 {
            thread_pool_builder = thread_pool_builder.num_threads(config.thread_pool_size);
        }
        let thread_pool = thread_pool_builder.build().expect("creating thread pool failed");
        let mut units = vec![];
        let unit_count = partition_info.units.len();
        let complete_graph = CompleteGraph::new(initializer.vertex_num, &initializer.weighted_edges);  // build the graph to construct the NN data structure
        let mut contained_vertices_vec: Vec<BTreeSet<VertexIndex>> = vec![];  // all vertices maintained by each unit
        let mut is_vertex_virtual: Vec<_> = (0..initializer.vertex_num).map(|_| false).collect();
        for virtual_vertex in initializer.virtual_vertices.iter() {
            is_vertex_virtual[*virtual_vertex as usize] = true;
        }
        let partition_units: Vec<PartitionUnitPtr> = (0..unit_count).map(|unit_index| PartitionUnitPtr::new_value(PartitionUnit {
            unit_index,
            enabled: unit_index < partition_info.config.partitions.len(),
        })).collect();
        let mut partitioned_initializers: Vec<PartitionedSolverInitializer> = (0..unit_count).map(|unit_index| {
            let mut interfaces = vec![];
            let mut current_index = unit_index;
            let owning_range = &partition_info.units[unit_index].owning_range;
            let mut contained_vertices = BTreeSet::new();
            for vertex_index in owning_range.iter() {
                contained_vertices.insert(vertex_index);
            }
            while let Some(parent_index) = &partition_info.units[current_index].parent {
                let mut mirror_vertices = vec![];
                if config.edges_in_fusion_unit {
                    for vertex_index in partition_info.units[*parent_index].owning_range.iter() {
                        let mut is_incident = false;
                        for (peer_index, _) in complete_graph.vertices[vertex_index as usize].edges.iter() {
                            if owning_range.contains(*peer_index) {
                                is_incident = true;
                                break
                            }
                        }
                        if is_incident {
                            mirror_vertices.push((vertex_index, is_vertex_virtual[vertex_index as usize]));
                            contained_vertices.insert(vertex_index);
                        }
                    }
                } else {
                    // first check if there EXISTS any vertex that's adjacent of it's contains vertex
                    let mut has_incident = false;
                    for vertex_index in partition_info.units[*parent_index].owning_range.iter() {
                        for (peer_index, _) in complete_graph.vertices[vertex_index as usize].edges.iter() {
                            if contained_vertices.contains(peer_index) {  // important diff: as long as it has an edge with contained vertex, add it
                                has_incident = true;
                                break
                            }
                        }
                        if has_incident {
                            break
                        }
                    }
                    if has_incident {
                        // add all vertices as mirrored
                        for vertex_index in partition_info.units[*parent_index].owning_range.iter() {
                            mirror_vertices.push((vertex_index, is_vertex_virtual[vertex_index as usize]));
                            contained_vertices.insert(vertex_index);
                        }
                    }
                }
                if !mirror_vertices.is_empty() {  // only add non-empty mirrored parents is enough
                    interfaces.push((partition_units[*parent_index].downgrade(), mirror_vertices));
                }
                current_index = *parent_index;
            }
            contained_vertices_vec.push(contained_vertices);
            PartitionedSolverInitializer {
                unit_index,
                vertex_num: initializer.vertex_num,
                edge_num: initializer.weighted_edges.len(),
                owning_range: *owning_range,
                owning_interface: if unit_index < partition_info.config.partitions.len() { None } else { Some(partition_units[unit_index].downgrade()) },
                weighted_edges: vec![],  // to be filled later
                interfaces,
                virtual_vertices: owning_range.iter().filter(|vertex_index| is_vertex_virtual[*vertex_index as usize]).collect(),
            }  // note that all fields can be modified later
        }).collect();
        // assign each edge to its unique partition
        for (edge_index, &(i, j, weight)) in initializer.weighted_edges.iter().enumerate() {
            assert_ne!(i, j, "invalid edge from and to the same vertex {}", i);
            assert!(i < initializer.vertex_num, "edge ({}, {}) connected to an invalid vertex {}", i, j, i);
            assert!(j < initializer.vertex_num, "edge ({}, {}) connected to an invalid vertex {}", i, j, j);
            let i_unit_index = partition_info.vertex_to_owning_unit[i as usize];
            let j_unit_index = partition_info.vertex_to_owning_unit[j as usize];
            // either left is ancestor of right or right is ancestor of left, otherwise the edge is invalid (because crossing two independent partitions)
            let is_i_ancestor = partition_info.units[i_unit_index].descendants.contains(&j_unit_index);
            let is_j_ancestor = partition_info.units[j_unit_index].descendants.contains(&i_unit_index);
            assert!(is_i_ancestor || is_j_ancestor || i_unit_index == j_unit_index, "violating edge ({}, {}) crossing two independent partitions {} and {}"
                , i, j, i_unit_index, j_unit_index);
            let ancestor_unit_index = if is_i_ancestor { i_unit_index } else { j_unit_index };
            let descendant_unit_index = if is_i_ancestor { j_unit_index } else { i_unit_index };
            if config.edges_in_fusion_unit {
                // the edge should be added to the descendant, and it's guaranteed that the descendant unit contains (although not necessarily owned) the vertex
                partitioned_initializers[descendant_unit_index].weighted_edges.push((i, j, weight, edge_index as EdgeIndex));
            } else {
                // add edge to every unit from the descendant (including) and the ancestor (excluding) who mirrored the vertex
                if ancestor_unit_index < partition_info.config.partitions.len() {
                    // leaf unit holds every unit
                    partitioned_initializers[descendant_unit_index].weighted_edges.push((i, j, weight, edge_index as EdgeIndex));
                } else {
                    // iterate every leaf unit of the `descendant_unit_index` to see if adding the edge or not
                    struct DfsInfo<'a> {
                        partition_config: &'a PartitionConfig,
                        partition_info: &'a PartitionInfo,
                        i: VertexIndex,
                        j: VertexIndex,
                        weight: Weight,
                        contained_vertices_vec: &'a Vec<BTreeSet<VertexIndex>>,
                        edge_index: EdgeIndex,
                    }
                    let dfs_info = DfsInfo {
                        partition_config: &partition_info.config, partition_info: &partition_info
                        , i, j, weight, contained_vertices_vec: &contained_vertices_vec, edge_index: edge_index as EdgeIndex,
                    };
                    fn dfs_add(unit_index: usize, dfs_info: &DfsInfo, partitioned_initializers: &mut Vec<PartitionedSolverInitializer>) {
                        if unit_index >= dfs_info.partition_config.partitions.len() {
                            let (left_index, right_index) = &dfs_info.partition_info.units[unit_index].children.expect("fusion unit must have children");
                            dfs_add(*left_index, dfs_info, partitioned_initializers);
                            dfs_add(*right_index, dfs_info, partitioned_initializers);
                        } else {
                            let contain_i = dfs_info.contained_vertices_vec[unit_index].contains(&dfs_info.i);
                            let contain_j = dfs_info.contained_vertices_vec[unit_index].contains(&dfs_info.j);
                            assert!(!(contain_i ^ contain_j), "{} and {} must either be both contained or not contained by {}", dfs_info.i, dfs_info.j, unit_index);
                            if contain_i {
                                partitioned_initializers[unit_index].weighted_edges.push((dfs_info.i, dfs_info.j, dfs_info.weight, dfs_info.edge_index));
                            }
                        }
                    }
                    dfs_add(descendant_unit_index, &dfs_info, &mut partitioned_initializers);
                }
            }
        }
        // println!("partitioned_initializers: {:?}", partitioned_initializers);
        thread_pool.scope(|_| {
            (0..unit_count).into_par_iter().map(|unit_index| {
                // println!("unit_index: {unit_index}");
                let dual_module = SerialModule::new_partitioned(&partitioned_initializers[unit_index]);
                DualModuleParallelUnitPtr::new_wrapper(dual_module, unit_index, Arc::clone(&partition_info), partition_units[unit_index].clone()
                    , config.enable_parallel_execution)
            }).collect_into_vec(&mut units);
        });
        // fill in the children and parent references
        for unit_index in 0..unit_count {
            let mut unit = units[unit_index].write();
            if let Some((left_children_index, right_children_index)) = &partition_info.units[unit_index].children {
                unit.children = Some((units[*left_children_index].downgrade(), units[*right_children_index].downgrade()))
            }
            if let Some(parent_index) = &partition_info.units[unit_index].parent {
                unit.parent = Some(units[*parent_index].downgrade());
            }
        }
        // fill in the extra_descendant_mirrored_vertices
        for unit_index in 0..unit_count {
            lock_write!(unit, units[unit_index]);
            let whole_range = &partition_info.units[unit_index].whole_range;
            let partitioned_initializer = &partitioned_initializers[unit_index];
            for (_, interface_vertices) in partitioned_initializer.interfaces.iter() {
                for (vertex_index, _) in interface_vertices.iter() {
                    if !whole_range.contains(*vertex_index) {
                        unit.extra_descendant_mirrored_vertices.insert(*vertex_index);
                    }
                }
            }
            if let Some((left_children_weak, right_children_weak)) = unit.children.clone() {
                for child_weak in [left_children_weak, right_children_weak] {
                    // note: although iterating over HashSet is not performance optimal, this only happens at initialization and thus it's fine
                    for vertex_index in child_weak.upgrade_force().read_recursive().extra_descendant_mirrored_vertices.iter() {
                        if !whole_range.contains(*vertex_index) {
                            unit.extra_descendant_mirrored_vertices.insert(*vertex_index);
                        }
                    }
                }
            }
            // println!("{} extra_descendant_mirrored_vertices: {:?}", unit.unit_index, unit.extra_descendant_mirrored_vertices);
        }
        Self {
            units,
            config,
            partition_info,
            thread_pool: Arc::new(thread_pool),
            empty_sync_request: vec![],
        }
    }

    /// find the active ancestor to handle this dual node (should be unique, i.e. any time only one ancestor is active)
    #[inline(never)]
    pub fn find_active_ancestor(&self, dual_node_ptr: &DualNodePtr) -> DualModuleParallelUnitPtr<SerialModule> {
        self.find_active_ancestor_option(dual_node_ptr).unwrap()
    }

    pub fn find_active_ancestor_option(&self, dual_node_ptr: &DualNodePtr) -> Option<DualModuleParallelUnitPtr<SerialModule>> {
        // find the first active ancestor unit that should handle this dual node
        let representative_vertex = dual_node_ptr.get_representative_vertex();
        let owning_unit_index = self.partition_info.vertex_to_owning_unit[representative_vertex as usize];
        let mut owning_unit_ptr = self.units[owning_unit_index].clone();
        loop {
            let owning_unit = owning_unit_ptr.read_recursive();
            if owning_unit.is_active {
                break  // find an active unit
            }
            if let Some(parent_weak) = &owning_unit.parent {
                let parent_owning_unit_ptr = parent_weak.upgrade_force();
                drop(owning_unit);
                owning_unit_ptr = parent_owning_unit_ptr;
            } else {
                return None
            }
        }
        Some(owning_unit_ptr)
    }

    /// statically fuse them all, may be called at any state (meaning each unit may not necessarily be solved locally)
    pub fn static_fuse_all(&mut self) {
        for unit_ptr in self.units.iter() {
            lock_write!(unit, unit_ptr);
            if let Some((left_child_weak, right_child_weak)) = &unit.children {
                {  // ignore already fused children and work on others
                    let left_child_ptr = left_child_weak.upgrade_force();
                    let right_child_ptr = right_child_weak.upgrade_force();
                    let left_child = left_child_ptr.read_recursive();
                    let right_child = right_child_ptr.read_recursive();
                    if !left_child.is_active && !right_child.is_active {
                        continue  // already fused, it's ok to just ignore
                    }
                    debug_assert!(left_child.is_active && right_child.is_active, "children must be active at the same time if fusing all together");
                }
                unit.static_fuse();
            }
        }
    }

}

impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleImpl for DualModuleParallel<SerialModule> {

    /// initialize the dual module, which is supposed to be reused for multiple decoding tasks with the same structure
    fn new_empty(initializer: &SolverInitializer) -> Self {
        Self::new_config(initializer, &PartitionConfig::new(initializer.vertex_num).info(), DualModuleParallelConfig::default())
    }

    /// clear all growth and existing dual nodes
    #[inline(never)]
    fn clear(&mut self) {
        self.thread_pool.scope(|_| {
            self.units.par_iter().enumerate().for_each(|(unit_idx, unit_ptr)| {
                lock_write!(unit, unit_ptr);
                unit.clear();
                unit.is_active = unit_idx < self.partition_info.config.partitions.len();  // only partitioned serial modules are active at the beginning
                unit.partition_unit.write().enabled = false;
                unit.elevated_dual_nodes.clear();
            });
        })
    }

    // although not the intended way to use it, we do support these common APIs for compatibility with normal primal modules

    fn add_dual_node(&mut self, dual_node_ptr: &DualNodePtr) {
        let unit_ptr = self.find_active_ancestor(dual_node_ptr);
        self.thread_pool.scope(|_| {
            lock_write!(unit, unit_ptr);
            unit.add_dual_node(dual_node_ptr);
        })
    }

    fn remove_blossom(&mut self, dual_node_ptr: DualNodePtr) {
        let unit_ptr = self.find_active_ancestor(&dual_node_ptr);
        self.thread_pool.scope(|_| {
            lock_write!(unit, unit_ptr);
            unit.remove_blossom(dual_node_ptr);
        })
    }

    fn set_grow_state(&mut self, dual_node_ptr: &DualNodePtr, grow_state: DualNodeGrowState) {
        let unit_ptr = self.find_active_ancestor(dual_node_ptr);
        self.thread_pool.scope(|_| {
            lock_write!(unit, unit_ptr);
            unit.set_grow_state(dual_node_ptr, grow_state);
        })
    }

    fn compute_maximum_update_length_dual_node(&mut self, dual_node_ptr: &DualNodePtr, is_grow: bool, simultaneous_update: bool) -> MaxUpdateLength {
        let unit_ptr = self.find_active_ancestor(dual_node_ptr);
        self.thread_pool.scope(|_| {
            lock_write!(unit, unit_ptr);
            unit.compute_maximum_update_length_dual_node(dual_node_ptr, is_grow, simultaneous_update)
        })
    }

    fn compute_maximum_update_length(&mut self) -> GroupMaxUpdateLength {
        self.thread_pool.scope(|_| {
            let results: Vec<_> = self.units.par_iter().filter_map(|unit_ptr| {
                lock_write!(unit, unit_ptr);
                if !unit.is_active { return None }
                Some(unit.compute_maximum_update_length())
            }).collect();
            let mut group_max_update_length = GroupMaxUpdateLength::new();
            for local_group_max_update_length in results.into_iter() {
                group_max_update_length.extend(local_group_max_update_length);
            }
            group_max_update_length
        })
    }

    fn grow_dual_node(&mut self, dual_node_ptr: &DualNodePtr, length: Weight) {
        let unit_ptr = self.find_active_ancestor(dual_node_ptr);
        self.thread_pool.scope(|_| {
            lock_write!(unit, unit_ptr);
            unit.grow_dual_node(dual_node_ptr, length);
        })
    }

    fn grow(&mut self, length: Weight) {
        self.thread_pool.scope(|_| {
            self.units.par_iter().for_each(|unit_ptr| {
                lock_write!(unit, unit_ptr);
                if !unit.is_active { return }
                unit.grow(length);
            });
        })
    }

    fn load_edge_modifier(&mut self, edge_modifier: &[(EdgeIndex, Weight)]) {
        self.thread_pool.scope(|_| {
            self.units.par_iter().for_each(|unit_ptr| {
                lock_write!(unit, unit_ptr);
                if !unit.is_active { return }
                unit.load_edge_modifier(edge_modifier);
            });
        })
    }

    fn prepare_nodes_shrink(&mut self, nodes_circle: &[DualNodePtr]) -> &mut Vec<SyncRequest> {
        let unit_ptr = self.find_active_ancestor(&nodes_circle[0]);
        self.thread_pool.scope(|_| {
            lock_write!(unit, unit_ptr);
            unit.prepare_nodes_shrink(nodes_circle);
        });
        &mut self.empty_sync_request
    }

}

impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleParallelImpl for DualModuleParallel<SerialModule> {

    type UnitType = DualModuleParallelUnit<SerialModule>;

    fn get_unit(&self, unit_index: usize) -> ArcManualSafeLock<Self::UnitType> {
        self.units[unit_index].clone()
    }

}


/*
Implementing visualization functions
*/

impl<SerialModule: DualModuleImpl + FusionVisualizer + Send + Sync> FusionVisualizer for DualModuleParallel<SerialModule> {
    fn snapshot(&self, abbrev: bool) -> serde_json::Value {
        // do the sanity check first before taking snapshot
        // self.sanity_check().unwrap();
        let mut value = json!({});
        for unit_ptr in self.units.iter() {
            let unit = unit_ptr.read_recursive();
            if !unit.is_active { continue }  // do not visualize inactive units
            let value_2 = unit.snapshot(abbrev);
            snapshot_combine_values(&mut value, value_2, abbrev);
        }
        value
    }
}

impl<SerialModule: DualModuleImpl + FusionVisualizer + Send + Sync> FusionVisualizer for DualModuleParallelUnit<SerialModule> {
    fn snapshot(&self, abbrev: bool) -> serde_json::Value {
        let mut value = self.serial_module.snapshot(abbrev);
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            snapshot_combine_values(&mut value, left_child_weak.upgrade_force().read_recursive().snapshot(abbrev), abbrev);
            snapshot_combine_values(&mut value, right_child_weak.upgrade_force().read_recursive().snapshot(abbrev), abbrev);
        }
        value
    }
}

impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleParallelUnit<SerialModule> {

    /// statically fuse the children of this unit
    pub fn static_fuse(&mut self) {
        debug_assert!(!self.is_active, "cannot fuse the child an already active unit");
        let (left_child_ptr, right_child_ptr) = (self.children.as_ref().unwrap().0.upgrade_force(), self.children.as_ref().unwrap().1.upgrade_force());
        let mut left_child = left_child_ptr.write();
        let mut right_child = right_child_ptr.write();
        debug_assert!(left_child.is_active && right_child.is_active, "cannot fuse inactive pairs");
        // update active state
        self.is_active = true;
        left_child.is_active = false;
        right_child.is_active = false;
        // set partition unit as enabled
        let mut partition_unit = self.partition_unit.write();
        partition_unit.enabled = true;
    }

    /// fuse the children of this unit and also fuse the interfaces of them
    pub fn fuse(&mut self, parent_interface: &DualModuleInterfacePtr, children_interfaces: (&DualModuleInterfacePtr, &DualModuleInterfacePtr)) {
        self.static_fuse();
        let (left_interface, right_interface) = children_interfaces;
        let right_child_ptr = self.children.as_ref().unwrap().1.upgrade_force();
        lock_write!(right_child, right_child_ptr);
        // change the index of dual nodes in the right children
        let bias = left_interface.read_recursive().nodes_count();
        right_child.iterative_bias_dual_node_index(bias);
        parent_interface.fuse(left_interface, right_interface);
    }

    pub fn iterative_bias_dual_node_index(&mut self, bias: NodeIndex) {
        // depth-first search
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            if self.enable_parallel_execution {
                rayon::join(|| {
                    left_child_weak.upgrade_force().write().iterative_bias_dual_node_index(bias);
                }, || {
                    right_child_weak.upgrade_force().write().iterative_bias_dual_node_index(bias);
                });
            } else {
                left_child_weak.upgrade_force().write().iterative_bias_dual_node_index(bias);
                right_child_weak.upgrade_force().write().iterative_bias_dual_node_index(bias);
            }
        }
        // my serial module
        self.serial_module.bias_dual_node_index(bias);
    }

    /// if any descendant unit mirror or own the vertex
    pub fn is_vertex_in_descendant(&self, vertex_index: VertexIndex) -> bool {
        self.whole_range.contains(vertex_index) || self.extra_descendant_mirrored_vertices.contains(&vertex_index)
    }

    /// no need to deduplicate the events: the result will always be consistent with the last one
    fn execute_sync_events(&mut self, sync_requests: &[SyncRequest]) {
        // println!("sync_requests: {sync_requests:?}");
        for sync_request in sync_requests.iter() {
            sync_request.update();
            self.execute_sync_event(sync_request);
        }
    }

    /// iteratively prepare all growing and shrinking and append the sync requests
    fn iterative_prepare_all(&mut self, sync_requests: &mut Vec<SyncRequest>) {
        if !self.has_active_node {
            return  // early return to avoid going through all units
        }
        // depth-first search
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            if self.enable_parallel_execution {
                let mut sync_requests_2 = vec![];
                rayon::join(|| {
                    left_child_weak.upgrade_force().write().iterative_prepare_all(sync_requests);
                }, || {
                    right_child_weak.upgrade_force().write().iterative_prepare_all(&mut sync_requests_2);
                });
                sync_requests.append(&mut sync_requests_2);
            } else {
                left_child_weak.upgrade_force().write().iterative_prepare_all(sync_requests);
                right_child_weak.upgrade_force().write().iterative_prepare_all(sync_requests);
            }
        }
        // my serial module
        let local_sync_requests = self.serial_module.prepare_all();
        sync_requests.append(local_sync_requests);
    }

    /// iteratively set grow state
    fn iterative_set_grow_state(&mut self, dual_node_ptr: &DualNodePtr, grow_state: DualNodeGrowState, representative_vertex: VertexIndex) {
        if !self.whole_range.contains(representative_vertex) && !self.elevated_dual_nodes.contains(dual_node_ptr) {
            return  // no descendant related to this dual node
        }
        if grow_state != DualNodeGrowState::Stay {
            self.has_active_node = true;
        }
        // depth-first search
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            left_child_weak.upgrade_force().write().iterative_set_grow_state(dual_node_ptr, grow_state, representative_vertex);
            right_child_weak.upgrade_force().write().iterative_set_grow_state(dual_node_ptr, grow_state, representative_vertex);
        }
        if self.owning_range.contains(representative_vertex) || self.serial_module.contains_dual_node(dual_node_ptr) {
            self.serial_module.set_grow_state(dual_node_ptr, grow_state);
        }
    }

    /// check if elevated_dual_nodes contains any dual node in the list
    pub fn elevated_dual_nodes_contains_any(&self, nodes: &[DualNodePtr]) -> bool {
        for node_ptr in nodes.iter() {
            if self.elevated_dual_nodes.contains(node_ptr) {
                return true
            }
        }
        false
    }

    /// prepare the initial shrink of a blossom
    fn iterative_prepare_nodes_shrink(&mut self, nodes_circle: &[DualNodePtr], nodes_circle_vertices: &[VertexIndex]
            , sync_requests: &mut Vec<SyncRequest>) {
        if !self.whole_range.contains_any(nodes_circle_vertices) && !self.elevated_dual_nodes_contains_any(nodes_circle) {
            return  // no descendant related to this dual node
        }
        self.has_active_node = true;
        // depth-first search
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            if self.enable_parallel_execution {
                let mut sync_requests_2 = vec![];
                rayon::join(|| {
                    left_child_weak.upgrade_force().write().iterative_prepare_nodes_shrink(nodes_circle, nodes_circle_vertices, sync_requests);
                }, || {
                    right_child_weak.upgrade_force().write().iterative_prepare_nodes_shrink(nodes_circle, nodes_circle_vertices, &mut sync_requests_2);
                });
                sync_requests.append(&mut sync_requests_2);
            } else {
                left_child_weak.upgrade_force().write().iterative_prepare_nodes_shrink(nodes_circle, nodes_circle_vertices, sync_requests);
                right_child_weak.upgrade_force().write().iterative_prepare_nodes_shrink(nodes_circle, nodes_circle_vertices, sync_requests);
            }
        }
        let local_sync_requests = self.serial_module.prepare_nodes_shrink(nodes_circle);
        sync_requests.append(local_sync_requests);
    }

    fn iterative_add_blossom(&mut self, blossom_ptr: &DualNodePtr, nodes_circle: &[DualNodePtr], representative_vertex: VertexIndex
            , nodes_circle_vertices: &[VertexIndex]) {
        if !self.whole_range.contains_any(nodes_circle_vertices) && !self.elevated_dual_nodes_contains_any(nodes_circle) {
            return  // no descendant related to this dual node
        }
        self.has_active_node = true;
        // depth-first search
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            if self.enable_parallel_execution {
                rayon::join(|| {
                    left_child_weak.upgrade_force().write().iterative_add_blossom(blossom_ptr, nodes_circle, representative_vertex, nodes_circle_vertices);
                }, || {
                    right_child_weak.upgrade_force().write().iterative_add_blossom(blossom_ptr, nodes_circle, representative_vertex, nodes_circle_vertices);
                });
            } else {
                left_child_weak.upgrade_force().write().iterative_add_blossom(blossom_ptr, nodes_circle, representative_vertex, nodes_circle_vertices);
                right_child_weak.upgrade_force().write().iterative_add_blossom(blossom_ptr, nodes_circle, representative_vertex, nodes_circle_vertices);
            }
        }
        if self.owning_range.contains_any(nodes_circle_vertices) || self.serial_module.contains_dual_nodes_any(nodes_circle) {
            self.serial_module.add_blossom(blossom_ptr);
        }
        // if I'm not on the representative path of this dual node, I need to register the propagated_dual_node
        // note that I don't need to register propagated_grandson_dual_node because it's never gonna grow inside the blossom
        if !self.whole_range.contains(representative_vertex) {
            self.elevated_dual_nodes.insert(blossom_ptr.clone());
        }
    }

    fn iterative_add_defect_node(&mut self, dual_node_ptr: &DualNodePtr, vertex_index: VertexIndex) {
        // if the vertex is not hold by any descendant, simply return
        if !self.is_vertex_in_descendant(vertex_index) {
            return
        }
        self.has_active_node = true;
        // println!("sync_prepare_growth_update_sync_event: vertex {}, unit index {}", sync_event.vertex_index, self.unit_index);
        // depth-first search
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            if self.enable_parallel_execution {
                rayon::join(|| {
                    left_child_weak.upgrade_force().write().iterative_add_defect_node(dual_node_ptr, vertex_index);
                }, || {
                    right_child_weak.upgrade_force().write().iterative_add_defect_node(dual_node_ptr, vertex_index);
                });
            } else {
                left_child_weak.upgrade_force().write().iterative_add_defect_node(dual_node_ptr, vertex_index);
                right_child_weak.upgrade_force().write().iterative_add_defect_node(dual_node_ptr, vertex_index);
            }
        }
        // update on my serial module
        if self.serial_module.contains_vertex(vertex_index) {
            self.serial_module.add_defect_node(dual_node_ptr);
        }
        // if I'm not on the representative path of this dual node, I need to register the propagated_dual_node
        // note that I don't need to register propagated_grandson_dual_node because it's never gonna grow inside the blossom
        if !self.whole_range.contains(vertex_index) {
            self.elevated_dual_nodes.insert(dual_node_ptr.clone());
        }
    }

    fn iterative_compute_maximum_update_length(&mut self, group_max_update_length: &mut GroupMaxUpdateLength) -> bool {
        // early terminate if no active dual nodes anywhere in the descendant
        if !self.has_active_node {
            return false
        }
        let serial_module_group_max_update_length = self.serial_module.compute_maximum_update_length();
        if !serial_module_group_max_update_length.is_active() {
            self.has_active_node = false;
        }
        group_max_update_length.extend(serial_module_group_max_update_length);
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            let (left_child_has_active_node, right_child_has_active_node) = if self.enable_parallel_execution {
                let mut group_max_update_length_2 = GroupMaxUpdateLength::new();
                let (left_child_has_active_node, right_child_has_active_node) = rayon::join(|| {
                    left_child_weak.upgrade_force().write().iterative_compute_maximum_update_length(group_max_update_length)
                }, || {
                    right_child_weak.upgrade_force().write().iterative_compute_maximum_update_length(&mut group_max_update_length_2)
                });
                group_max_update_length.extend(group_max_update_length_2);
                (left_child_has_active_node, right_child_has_active_node)
            } else {
                (left_child_weak.upgrade_force().write().iterative_compute_maximum_update_length(group_max_update_length),
                    right_child_weak.upgrade_force().write().iterative_compute_maximum_update_length(group_max_update_length))
            };
            if left_child_has_active_node || right_child_has_active_node {
                self.has_active_node = true
            }
        }
        self.has_active_node
    }

    fn iterative_grow_dual_node(&mut self, dual_node_ptr: &DualNodePtr, length: Weight, representative_vertex: VertexIndex) {
        if !self.whole_range.contains(representative_vertex) && !self.elevated_dual_nodes.contains(dual_node_ptr) {
            return  // no descendant related to this dual node
        }
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            if self.enable_parallel_execution {
                rayon::join(|| {
                    left_child_weak.upgrade_force().write().iterative_grow_dual_node(dual_node_ptr, length, representative_vertex);
                }, || {
                    right_child_weak.upgrade_force().write().iterative_grow_dual_node(dual_node_ptr, length, representative_vertex);
                });
            } else {
                left_child_weak.upgrade_force().write().iterative_grow_dual_node(dual_node_ptr, length, representative_vertex);
                right_child_weak.upgrade_force().write().iterative_grow_dual_node(dual_node_ptr, length, representative_vertex);
            }
        }
        if self.owning_range.contains(representative_vertex) || self.serial_module.contains_dual_node(dual_node_ptr) {
            self.serial_module.grow_dual_node(dual_node_ptr, length);
        }
    }

    fn iterative_grow(&mut self, length: Weight) {
        // early terminate if no active dual nodes anywhere in the descendant
        if !self.has_active_node {
            return
        }
        self.serial_module.grow(length);
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            if self.enable_parallel_execution {
                rayon::join(|| {
                    left_child_weak.upgrade_force().write().iterative_grow(length);
                }, || {
                    right_child_weak.upgrade_force().write().iterative_grow(length);
                });
            } else {
                left_child_weak.upgrade_force().write().iterative_grow(length);
                right_child_weak.upgrade_force().write().iterative_grow(length);
            }
        }
    }

    fn iterative_remove_blossom(&mut self, dual_node_ptr: &DualNodePtr, representative_vertex: VertexIndex) {
        if !self.whole_range.contains(representative_vertex) && !self.elevated_dual_nodes.contains(dual_node_ptr) {
            return  // no descendant related to this dual node
        }
        self.has_active_node = true;
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            if self.enable_parallel_execution {
                rayon::join(|| {
                    left_child_weak.upgrade_force().write().iterative_remove_blossom(dual_node_ptr, representative_vertex);
                }, || {
                    right_child_weak.upgrade_force().write().iterative_remove_blossom(dual_node_ptr, representative_vertex);
                });
            } else {
                left_child_weak.upgrade_force().write().iterative_remove_blossom(dual_node_ptr, representative_vertex);
                right_child_weak.upgrade_force().write().iterative_remove_blossom(dual_node_ptr, representative_vertex);
            }
        }
        if self.owning_range.contains(representative_vertex) || self.serial_module.contains_dual_node(dual_node_ptr) {
            self.serial_module.remove_blossom(dual_node_ptr.clone());
        }
    }

}

impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleParallelUnitPtr<SerialModule> {

    /// create a simple wrapper over a serial dual module
    pub fn new_wrapper(serial_module: SerialModule, unit_index: usize, partition_info: Arc<PartitionInfo>, partition_unit: PartitionUnitPtr
            , enable_parallel_execution: bool) -> Self {
        let partition_unit_info = &partition_info.units[unit_index];
        Self::new_value(DualModuleParallelUnit {
            unit_index,
            partition_info: partition_info.clone(),
            partition_unit,
            is_active: partition_unit_info.children.is_none(),  // only activate the leaves in the dependency tree
            whole_range: partition_unit_info.whole_range,
            owning_range: partition_unit_info.owning_range,
            extra_descendant_mirrored_vertices: HashSet::new(),  // to be filled later
            serial_module,
            children: None,  // to be filled later
            parent: None,  // to be filled later
            elevated_dual_nodes: PtrWeakHashSet::new(),
            empty_sync_request: vec![],
            enable_parallel_execution,
            has_active_node: true,  // by default to true, because children may have active nodes
        })
    }

}

/// We cannot implement async function because a RwLockWriteGuard implements !Send
impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleImpl for DualModuleParallelUnit<SerialModule> {

    /// clear all growth and existing dual nodes
    fn new_empty(_initializer: &SolverInitializer) -> Self {
        panic!("creating parallel unit directly from initializer is forbidden, use `DualModuleParallel::new` instead");
    }

    /// clear all growth and existing dual nodes
    fn clear(&mut self) {
        self.has_active_node = true;
        self.serial_module.clear()
    }

    /// add a new dual node from dual module root
    fn add_dual_node(&mut self, dual_node_ptr: &DualNodePtr) {
        self.has_active_node = true;
        let representative_vertex = dual_node_ptr.get_representative_vertex();
        match &dual_node_ptr.read_recursive().class {
            // fast path: if dual node is a single vertex, then only add to the owning node; single vertex dual node can only add when dual variable = 0
            DualNodeClass::DefectVertex { defect_index } => {
                if self.owning_range.contains(representative_vertex) {  // fast path: the most common one
                    self.iterative_add_defect_node(dual_node_ptr, *defect_index);
                } else {
                    // find the one that owns it and add the dual node, and then add the serial_module
                    if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
                        let mut child_ptr = if representative_vertex < self.owning_range.start() { left_child_weak.upgrade_force() } else { right_child_weak.upgrade_force() };
                        let mut is_owning_dual_node = false;
                        while !is_owning_dual_node {
                            let mut child = child_ptr.write();
                            child.has_active_node = true;
                            debug_assert!(child.whole_range.contains(representative_vertex), "selected child must contains the vertex");
                            is_owning_dual_node = child.owning_range.contains(representative_vertex);
                            if !is_owning_dual_node {  // search for the grandsons
                                let grandson_ptr = if let Some((left_child_weak, right_child_weak)) = child.children.as_ref() {
                                    if representative_vertex < child.owning_range.start() { left_child_weak.upgrade_force() } else { right_child_weak.upgrade_force() }
                                } else { unreachable!() };
                                drop(child);
                                child_ptr = grandson_ptr;
                            }
                        }
                        lock_write!(child, child_ptr);
                        child.iterative_add_defect_node(dual_node_ptr, *defect_index);
                    } else { unreachable!() }
                }
                // if it's children mirrors this vertex as well, then it's necessary to add this dual node to those children as well
            },
            // this is a blossom, meaning it's children dual nodes may reside on any path
            DualNodeClass::Blossom { nodes_circle, .. } => {
                // first set all children dual nodes as shrinking, to be safe
                let nodes_circle_ptrs: Vec<_> = nodes_circle.iter().map(|weak| weak.upgrade_force()).collect();
                let nodes_circle_vertices: Vec<_> = nodes_circle.iter().map(|weak| weak.upgrade_force().get_representative_vertex()).collect();
                self.prepare_nodes_shrink(&nodes_circle_ptrs);
                self.iterative_add_blossom(dual_node_ptr, &nodes_circle_ptrs, representative_vertex, &nodes_circle_vertices);
            },
        }
    }

    fn remove_blossom(&mut self, dual_node_ptr: DualNodePtr) {
        let representative_vertex = dual_node_ptr.get_representative_vertex();
        self.iterative_remove_blossom(&dual_node_ptr, representative_vertex);
    }

    fn set_grow_state(&mut self, dual_node_ptr: &DualNodePtr, grow_state: DualNodeGrowState) {
        // println!("unit {} set_grow_state {:?} {:?}", self.unit_index, dual_node_ptr, grow_state);
        // find the path towards the owning unit of this dual node, and also try paths towards the elevated
        let representative_vertex = dual_node_ptr.get_representative_vertex();
        debug_assert!(self.whole_range.contains(representative_vertex), "cannot set growth state of dual node outside of the scope");
        self.iterative_set_grow_state(dual_node_ptr, grow_state, representative_vertex);
    }

    fn compute_maximum_update_length_dual_node(&mut self, dual_node_ptr: &DualNodePtr, is_grow: bool, simultaneous_update: bool) -> MaxUpdateLength {
        // TODO: execute on all nodes that handles this dual node
        let max_update_length = self.serial_module.compute_maximum_update_length_dual_node(dual_node_ptr, is_grow, simultaneous_update);
        if !(self.children.is_none() && self.is_active) {  // for those base partitions without being fused, we don't need to update
            max_update_length.update();  // only necessary after involved in fusion
        }
        max_update_length
    }

    fn compute_maximum_update_length(&mut self) -> GroupMaxUpdateLength {
        // first prepare all dual node for growth and shrink accordingly and synchronize them
        self.prepare_all();
        // them do the functions independently
        let mut group_max_update_length = GroupMaxUpdateLength::new();
        self.iterative_compute_maximum_update_length(&mut group_max_update_length);
        if !(self.children.is_none() && self.is_active) {  // for those base partitions without being fused, we don't need to update
            group_max_update_length.update();  // only necessary after involved in fusion
        }
        group_max_update_length
    }

    fn grow_dual_node(&mut self, dual_node_ptr: &DualNodePtr, length: Weight) {
        let representative_vertex = dual_node_ptr.get_representative_vertex();
        debug_assert!(self.whole_range.contains(representative_vertex), "cannot grow dual node outside of the scope");
        self.iterative_grow_dual_node(dual_node_ptr, length, representative_vertex);
    }

    fn grow(&mut self, length: Weight) {
        self.iterative_grow(length);
    }

    fn load_edge_modifier(&mut self, edge_modifier: &[(EdgeIndex, Weight)]) {
        // TODO: split the edge modifier and then load them to individual descendant units
        // hint: each edge could appear in any unit that mirrors the two vertices
        self.serial_module.load_edge_modifier(edge_modifier)
    }

    fn prepare_nodes_shrink(&mut self, nodes_circle: &[DualNodePtr]) -> &mut Vec<SyncRequest> {
        let nodes_circle_vertices: Vec<_> = nodes_circle.iter().map(|ptr| ptr.get_representative_vertex()).collect();
        let mut sync_requests = vec![];
        loop {
            self.iterative_prepare_nodes_shrink(nodes_circle, &nodes_circle_vertices, &mut sync_requests);
            if sync_requests.is_empty() {
                break
            }
            // println!("sync_requests: {sync_requests:?}");
            self.execute_sync_events(&sync_requests);
            sync_requests.clear();
        }
        &mut self.empty_sync_request
    }

    fn prepare_all(&mut self) -> &mut Vec<SyncRequest> {
        if self.children.is_none() {
            // don't do anything, not even prepare the growth because it will be done in the serial module
        } else {
            let mut sync_requests = vec![];
            loop {
                self.iterative_prepare_all(&mut sync_requests);
                if sync_requests.is_empty() {
                    break
                }
                // println!("sync_requests: {sync_requests:?}");
                self.execute_sync_events(&sync_requests);
                sync_requests.clear();
            }
        }
        &mut self.empty_sync_request
    }

    fn execute_sync_event(&mut self, sync_event: &SyncRequest) {
        // if the vertex is not hold by any descendant, simply return
        if !self.is_vertex_in_descendant(sync_event.vertex_index) {
            return
        }
        self.has_active_node = true;
        // println!("sync_prepare_growth_update_sync_event: vertex {}, unit index {}", sync_event.vertex_index, self.unit_index);
        // depth-first search
        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
            left_child_weak.upgrade_force().write().execute_sync_event(sync_event);
            right_child_weak.upgrade_force().write().execute_sync_event(sync_event);
        }
        // update on my serial module
        if self.serial_module.contains_vertex(sync_event.vertex_index) {
            // println!("update: vertex {}, unit index {}", sync_event.vertex_index, self.unit_index);
            self.serial_module.execute_sync_event(sync_event);
        }
        // if I'm not on the representative path of this dual node, I need to register the propagated_dual_node
        // note that I don't need to register propagated_grandson_dual_node because it's never gonna grow inside the blossom
        if !self.whole_range.contains(sync_event.vertex_index) {
            if let Some((propagated_dual_node_weak, _)) = sync_event.propagated_dual_node.as_ref() {
                self.elevated_dual_nodes.insert(propagated_dual_node_weak.upgrade_force());
            }
        }
    }

}

/// interface consists of several vertices; each vertex exists as a virtual vertex in several different serial dual modules.
/// each virtual vertex exists in at most one interface
pub struct InterfaceData {
    /// the serial dual modules that processes these virtual vertices,
    pub possession_modules: Vec<DualModuleSerialWeak>,
    /// the virtual vertices references in different modules, [idx of serial dual module] [idx of interfacing vertex]
    pub interfacing_vertices: Vec<Vec<VertexWeak>>,
}

/// interface between dual modules, consisting of a list of nodes of virtual nodes that sits on different modules
pub struct Interface {
    /// unique interface id for ease of zero-cost switching
    pub interface_id: usize,
    /// link to interface data
    pub data: Weak<InterfaceData>,
}


#[cfg(test)]
pub mod tests {
    use super::*;
    use super::super::example_codes::*;
    use super::super::primal_module::*;
    use super::super::primal_module_serial::*;

    pub fn dual_module_parallel_basic_standard_syndrome_optional_viz<F>(mut code: impl ExampleCode, visualize_filename: Option<String>
            , mut defect_vertices: Vec<VertexIndex>, final_dual: Weight, partition_func: F, reordered_vertices: Option<Vec<VertexIndex>>)
            -> (DualModuleInterfacePtr, PrimalModuleSerialPtr, DualModuleParallel<DualModuleSerial>) where F: Fn(&SolverInitializer, &mut PartitionConfig) {
        println!("{defect_vertices:?}");
        if let Some(reordered_vertices) = &reordered_vertices {
            code.reorder_vertices(reordered_vertices);
            defect_vertices = translated_defect_to_reordered(reordered_vertices, &defect_vertices);
        }
        let mut visualizer = match visualize_filename.as_ref() {
            Some(visualize_filename) => {
                let visualizer = Visualizer::new(Some(visualize_data_folder() + visualize_filename.as_str()), code.get_positions(), true).unwrap();
                print_visualize_link(visualize_filename.clone());
                Some(visualizer)
            }, None => None
        };
        let initializer = code.get_initializer();
        let mut partition_config = PartitionConfig::new(initializer.vertex_num);
        partition_func(&initializer, &mut partition_config);
        println!("partition_config: {partition_config:?}");
        let partition_info = partition_config.info();
        // create dual module
        let mut dual_module = DualModuleParallel::new_config(&initializer, &partition_info, DualModuleParallelConfig::default());
        dual_module.static_fuse_all();
        // create primal module
        let mut primal_module = PrimalModuleSerialPtr::new_empty(&initializer);
        primal_module.write().debug_resolve_only_one = true;  // to enable debug mode
        // try to work on a simple syndrome
        code.set_defect_vertices(&defect_vertices);
        let interface_ptr = DualModuleInterfacePtr::new_empty();
        primal_module.solve_visualizer(&interface_ptr, &code.get_syndrome(), &mut dual_module, visualizer.as_mut());
        let perfect_matching = primal_module.perfect_matching(&interface_ptr, &mut dual_module);
        let mut subgraph_builder = SubGraphBuilder::new(&initializer);
        subgraph_builder.load_perfect_matching(&perfect_matching);
        let subgraph = subgraph_builder.get_subgraph();
        if let Some(visualizer) = visualizer.as_mut() {
            visualizer.snapshot_combined("perfect matching and subgraph".to_string(), vec![&interface_ptr, &dual_module
                , &perfect_matching, &VisualizeSubgraph::new(&subgraph)]).unwrap();
        }
        assert_eq!(interface_ptr.sum_dual_variables(), subgraph_builder.total_weight(), "unmatched sum dual variables");
        assert_eq!(interface_ptr.sum_dual_variables(), final_dual * 2, "unexpected final dual variable sum");
        (interface_ptr, primal_module, dual_module)
    }

    pub fn dual_module_parallel_standard_syndrome<F>(code: impl ExampleCode, visualize_filename: String, defect_vertices: Vec<VertexIndex>
            , final_dual: Weight, partition_func: F, reordered_vertices: Option<Vec<VertexIndex>>)
            -> (DualModuleInterfacePtr, PrimalModuleSerialPtr, DualModuleParallel<DualModuleSerial>) where F: Fn(&SolverInitializer, &mut PartitionConfig) {
        dual_module_parallel_basic_standard_syndrome_optional_viz(code, Some(visualize_filename), defect_vertices, final_dual, partition_func, reordered_vertices)
    }

    /// test a simple case
    #[test]
    fn dual_module_parallel_basic_1() {  // cargo test dual_module_parallel_basic_1 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_basic_1.json");
        let defect_vertices = vec![39, 52, 63, 90, 100];
        let half_weight = 500;
        dual_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(11, 0.1, half_weight), visualize_filename, defect_vertices, 9 * half_weight, |initializer, _config| {
            println!("initializer: {initializer:?}");
        }, None);
    }

    /// split into 2, with no syndrome vertex on the interface
    #[test]
    fn dual_module_parallel_basic_2() {  // cargo test dual_module_parallel_basic_2 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_basic_2.json");
        let defect_vertices = vec![39, 52, 63, 90, 100];
        let half_weight = 500;
        dual_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(11, 0.1, half_weight), visualize_filename, defect_vertices, 9 * half_weight, |_initializer, config| {
            config.partitions = vec![
                VertexRange::new(0, 72),    // unit 0
                VertexRange::new(84, 132),  // unit 1
            ];
            config.fusions = vec![
                (0, 1),  // unit 2, by fusing 0 and 1
            ];
        }, None);
    }

    /// split into 2, with a syndrome vertex on the interface
    #[test]
    fn dual_module_parallel_basic_3() {  // cargo test dual_module_parallel_basic_3 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_basic_3.json");
        let defect_vertices = vec![39, 52, 63, 90, 100];
        let half_weight = 500;
        dual_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(11, 0.1, half_weight), visualize_filename, defect_vertices, 9 * half_weight, |_initializer, config| {
            config.partitions = vec![
                VertexRange::new(0, 60),    // unit 0
                VertexRange::new(72, 132),  // unit 1
            ];
            config.fusions = vec![
                (0, 1),  // unit 2, by fusing 0 and 1
            ];
        }, None);
    }

    /// split into 4, with no syndrome vertex on the interface
    #[test]
    fn dual_module_parallel_basic_4() {  // cargo test dual_module_parallel_basic_4 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_basic_4.json");
        // reorder vertices to enable the partition;
        let defect_vertices = vec![39, 52, 63, 90, 100];  // indices are before the reorder
        let half_weight = 500;
        dual_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(11, 0.1, half_weight), visualize_filename, defect_vertices, 9 * half_weight, |_initializer, config| {
            config.partitions = vec![
                VertexRange::new(0, 36),
                VertexRange::new(42, 72),
                VertexRange::new(84, 108),
                VertexRange::new(112, 132),
            ];
            config.fusions = vec![
                (0, 1),
                (2, 3),
                (4, 5),
            ];
        }, Some((|| {
            let mut reordered_vertices = vec![];
            let split_horizontal = 6;
            let split_vertical = 5;
            for i in 0..split_horizontal {  // left-top block
                for j in 0..split_vertical {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 11);
            }
            for i in 0..split_horizontal {  // interface between the left-top block and the right-top block
                reordered_vertices.push(i * 12 + split_vertical);
            }
            for i in 0..split_horizontal {  // right-top block
                for j in (split_vertical+1)..10 {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 10);
            }
            {  // the big interface between top and bottom
                for j in 0..12 {
                    reordered_vertices.push(split_horizontal * 12 + j);
                }
            }
            for i in (split_horizontal+1)..11 {  // left-bottom block
                for j in 0..split_vertical {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 11);
            }
            for i in (split_horizontal+1)..11 {  // interface between the left-bottom block and the right-bottom block
                reordered_vertices.push(i * 12 + split_vertical);
            }
            for i in (split_horizontal+1)..11 {  // right-bottom block
                for j in (split_vertical+1)..10 {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 10);
            }
            reordered_vertices
        })()));
    }

    /// split into 4, with 2 defect vertices on parent interfaces
    #[test]
    fn dual_module_parallel_basic_5() {  // cargo test dual_module_parallel_basic_5 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_basic_5.json");
        // reorder vertices to enable the partition;
        let defect_vertices = vec![39, 52, 63, 90, 100];  // indices are before the reorder
        let half_weight = 500;
        dual_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(11, 0.1, half_weight), visualize_filename, defect_vertices, 9 * half_weight, |_initializer, config| {
            config.partitions = vec![
                VertexRange::new(0, 25),
                VertexRange::new(30, 60),
                VertexRange::new(72, 97),
                VertexRange::new(102, 132),
            ];
            config.fusions = vec![
                (0, 1),
                (2, 3),
                (4, 5),
            ];
        }, Some((|| {
            let mut reordered_vertices = vec![];
            let split_horizontal = 5;
            let split_vertical = 4;
            for i in 0..split_horizontal {  // left-top block
                for j in 0..split_vertical {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 11);
            }
            for i in 0..split_horizontal {  // interface between the left-top block and the right-top block
                reordered_vertices.push(i * 12 + split_vertical);
            }
            for i in 0..split_horizontal {  // right-top block
                for j in (split_vertical+1)..10 {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 10);
            }
            {  // the big interface between top and bottom
                for j in 0..12 {
                    reordered_vertices.push(split_horizontal * 12 + j);
                }
            }
            for i in (split_horizontal+1)..11 {  // left-bottom block
                for j in 0..split_vertical {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 11);
            }
            for i in (split_horizontal+1)..11 {  // interface between the left-bottom block and the right-bottom block
                reordered_vertices.push(i * 12 + split_vertical);
            }
            for i in (split_horizontal+1)..11 {  // right-bottom block
                for j in (split_vertical+1)..10 {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 10);
            }
            reordered_vertices
        })()));
    }

    fn dual_module_parallel_debug_repetition_code_common(d: VertexNum, visualize_filename: String, defect_vertices: Vec<VertexIndex>, final_dual: Weight) {
        let half_weight = 500;
        let split_vertical = (d + 1) / 2;
        dual_module_parallel_standard_syndrome(CodeCapacityRepetitionCode::new(d, 0.1, half_weight), visualize_filename, defect_vertices, final_dual * half_weight, |initializer, config| {
            config.partitions = vec![
                VertexRange::new(0, split_vertical + 1),
                VertexRange::new(split_vertical + 2, initializer.vertex_num),
            ];
            config.fusions = vec![
                (0, 1),
            ];
        }, Some((|| {
            let mut reordered_vertices = vec![];
            for j in 0..split_vertical {
                reordered_vertices.push(j);
            }
            reordered_vertices.push(d);
            for j in split_vertical..d {
                reordered_vertices.push(j);
            }
            reordered_vertices
        })()));
    }

    /// debug blossom not growing properly
    #[test]
    fn dual_module_parallel_debug_1() {  // cargo test dual_module_parallel_debug_1 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_1.json");
        let defect_vertices = vec![2, 3, 4, 5, 6, 7, 8];  // indices are before the reorder
        dual_module_parallel_debug_repetition_code_common(11, visualize_filename, defect_vertices, 5);
    }

    /// debug 'internal error: entered unreachable code: VertexShrinkStop conflict cannot be solved by primal module
    /// the reason of this bug is that a shrinking node on the interface is sandwiched by two growing nodes resides on different children units
    /// for the serial implementation, this event can be easily handled by doing special configs
    /// but for the fused units, how to do it?
    /// This is the benefit of using software to develop first; if directly working on the hardware implementation, one would have to add more interface
    /// to support it, which could be super time-consuming
    #[test]
    fn dual_module_parallel_debug_2() {  // cargo test dual_module_parallel_debug_2 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_2.json");
        let defect_vertices = vec![5, 6, 7];  // indices are before the reorder
        dual_module_parallel_debug_repetition_code_common(11, visualize_filename, defect_vertices, 4);
    }

    /// the reason for this bug is that I forgot to set dual_variable correctly, leading to false VertexShrinkStop event at the 
    #[test]
    fn dual_module_parallel_debug_3() {  // cargo test dual_module_parallel_debug_3 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_3.json");
        let defect_vertices = vec![3, 5, 7];  // indices are before the reorder
        dual_module_parallel_debug_repetition_code_common(11, visualize_filename, defect_vertices, 5);
    }

    /// incorrect final result
    /// the reason is I didn't search through all the representative vertices of all children nodes, causing the parent blossom not propagating correctly
    #[test]
    fn dual_module_parallel_debug_4() {  // cargo test dual_module_parallel_debug_4 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_4.json");
        let defect_vertices = vec![2, 3, 5, 6, 7];  // indices are before the reorder
        dual_module_parallel_debug_repetition_code_common(11, visualize_filename, defect_vertices, 5);
    }

    /// unwrap fail on dual node to internal dual node
    /// the reason is I forgot to implement the remove_blossom API...
    #[test]
    fn dual_module_parallel_debug_5() {  // cargo test dual_module_parallel_debug_5 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_5.json");
        let defect_vertices = vec![0, 4, 7, 8, 9, 11];  // indices are before the reorder
        dual_module_parallel_debug_repetition_code_common(15, visualize_filename, defect_vertices, 7);
    }

    fn dual_module_parallel_debug_planar_code_common(d: VertexNum, visualize_filename: String, defect_vertices: Vec<VertexIndex>, final_dual: Weight) {
        let half_weight = 500;
        let split_horizontal = (d + 1) / 2;
        let row_count = d + 1;
        dual_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(d, 0.1, half_weight), visualize_filename, defect_vertices, final_dual * half_weight, |initializer, config| {
            config.partitions = vec![
                VertexRange::new(0, split_horizontal * row_count),
                VertexRange::new((split_horizontal + 1) * row_count, initializer.vertex_num),
            ];
            config.fusions = vec![
                (0, 1),
            ];
        }, None);
    }

    /// panic 'one cannot conflict with itself, double check to avoid deadlock'
    /// reason: when merging two `VertexShrinkStop` events into a single `Conflicting` event, I forget to check whether the two pointers are the same;
    /// if so, I should simply ignore it
    #[test]
    fn dual_module_parallel_debug_6() {  // cargo test dual_module_parallel_debug_6 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_6.json");
        let defect_vertices = vec![10, 11, 13, 32, 36, 37, 40, 44];  // indices are before the reorder
        dual_module_parallel_debug_planar_code_common(7, visualize_filename, defect_vertices, 5);
    }

    /// panic 'one cannot conflict with itself, double check to avoid deadlock'
    /// reason: when comparing the pointers of two `VertexShrinkStop` events, only compare their conflicting dual node, not the touching dual node
    #[test]
    fn dual_module_parallel_debug_7() {  // cargo test dual_module_parallel_debug_7 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_7.json");
        let defect_vertices = vec![3, 12, 21, 24, 27, 28, 33, 35, 36, 43, 50, 51];  // indices are before the reorder
        dual_module_parallel_debug_planar_code_common(7, visualize_filename, defect_vertices, 10);
    }

    /// panic `Option::unwrap()` on a `None` value', src/dual_module.rs:242:1
    #[test]
    fn dual_module_parallel_debug_8() {  // cargo test dual_module_parallel_debug_8 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_8.json");
        let defect_vertices = vec![1, 2, 3, 4, 9, 10, 13, 16, 17, 19, 24, 29, 33, 36, 37, 44, 48, 49, 51, 52];  // indices are before the reorder
        dual_module_parallel_debug_planar_code_common(7, visualize_filename, defect_vertices, 13);
    }

    /// panicked at 'dual node of edge should be some', src/dual_module_serial.rs:379:13
    /// reason: blossom's boundary has duplicate edges, solved by adding dedup functionality to edges
    #[test]
    fn dual_module_parallel_debug_9() {  // cargo test dual_module_parallel_debug_9 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_9.json");
        let defect_vertices = vec![60, 61, 72, 74, 84, 85, 109];  // indices are before the reorder
        dual_module_parallel_debug_planar_code_common(11, visualize_filename, defect_vertices, 6);
    }

    /// infinite loop at group_max_update_length: Conflicts(([Conflicting((12, 4), (15, 5))], {}))
    /// reason: I falsely use representative_vertex of the blossom instead of the representative vertices in the nodes circle in sync_prepare_blossom_initial_shrink
    #[test]
    fn dual_module_parallel_debug_10() {  // cargo test dual_module_parallel_debug_10 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_10.json");
        let defect_vertices = vec![145, 146, 165, 166, 183, 185, 203, 204, 205, 225, 264];  // indices are before the reorder
        dual_module_parallel_debug_planar_code_common(19, visualize_filename, defect_vertices, 11);
    }

    /// panicked at 'dual node of edge should be none', src/dual_module_serial.rs:400:25
    /// reason: duplicate edge in the boundary... again...
    /// this time it's because when judging whether an edge is already in the boundary, I mistakenly put the clearing edge logic into
    /// the if condition as well... when the edge is duplicate in the boundary already, my code will not clear the edge properly
    #[test]
    fn dual_module_parallel_debug_11() {  // cargo test dual_module_parallel_debug_11 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_11.json");
        let defect_vertices = vec![192, 193, 194, 212, 214, 232, 233];  // indices are before the reorder
        dual_module_parallel_debug_planar_code_common(19, visualize_filename, defect_vertices, 7);
    }

    /// panicked at 'no sync requests should arise here; make sure to deal with all sync requests before growing', src/dual_module_serial.rs:582:13
    /// just loop the synchronization process until no sync requests emerge
    #[test]
    fn dual_module_parallel_debug_12() {  // cargo test dual_module_parallel_debug_12 -- --nocapture
        let visualize_filename = format!("dual_module_parallel_debug_12.json");
        let defect_vertices = vec![197, 216, 235, 275, 296, 316];  // indices are before the reorder
        dual_module_parallel_debug_planar_code_common(19, visualize_filename, defect_vertices, 5);
    }

    /// test rayon global thread pool
    #[test]
    fn dual_module_parallel_rayon_test_1() {  // cargo test dual_module_parallel_rayon_test_1 -- --nocapture
        rayon::scope(|_| {
            println!("A");
            rayon::scope(|s| {
                s.spawn(|_| println!("B"));
                s.spawn(|_| println!("C"));
                s.spawn(|_| println!("D"));
                s.spawn(|_| println!("E"));
            });
            println!("F");
            rayon::scope(|s| {
                s.spawn(|_| println!("G"));
                s.spawn(|_| println!("H"));
                s.spawn(|_| println!("J"));
            });
            println!("K");
        });
    }

    #[test]
    fn dual_module_parallel_rayon_test_2() {  // cargo test dual_module_parallel_rayon_test_2 -- --nocapture
        let mut results = vec![];
        rayon::scope(|_| {
            results.push("A");
            let (mut ret_b, mut ret_c, mut ret_d, mut ret_e) = (None, None, None, None);
            rayon::scope(|s| {
                s.spawn(|_| ret_b = Some("B"));
                s.spawn(|_| ret_c = Some("C"));
                s.spawn(|_| ret_d = Some("D"));
                s.spawn(|_| ret_e = Some("E"));
            });
            results.push(ret_b.unwrap());
            results.push(ret_c.unwrap());
            results.push(ret_d.unwrap());
            results.push(ret_e.unwrap());
            results.push("F");
            let (mut ret_g, mut ret_h, mut ret_j) = (None, None, None);
            rayon::scope(|s| {
                s.spawn(|_| ret_g = Some("G"));
                s.spawn(|_| ret_h = Some("H"));
                s.spawn(|_| ret_j = Some("J"));
            });
            results.push(ret_g.unwrap());
            results.push(ret_h.unwrap());
            results.push(ret_j.unwrap());
            results.push("K");
        });
        println!("results: {results:?}");
    }

    #[test]
    fn dual_module_parallel_rayon_test_3() {  // cargo test dual_module_parallel_rayon_test_3 -- --nocapture
        let mut results = vec![];
        rayon::scope(|_| {
            results.push("A");
            results.par_extend(["B", "C", "D", "E"].into_par_iter().map(|id| {
                // some complex calculation
                id
            }));
            results.push("F");
            results.par_extend(["G", "H", "J"].into_par_iter().map(|id| {
                // some complex calculation
                id
            }));
            results.push("K");
        });
        println!("results: {results:?}");
    }

}