fusion_blossom/
dual_module_parallel.rs

1//! Serial Dual Parallel
2//!
3//! A parallel implementation of the dual module, leveraging the serial version
4//!
5//! While it assumes single machine (using async runtime of Rust), the design targets distributed version
6//! that can spawn on different machines efficiently. The distributed version can be build based on this
7//! parallel version.
8//!
9//! Notes:
10//!
11//! According to https://tokio.rs/tokio/tutorial, tokio is not good for parallel computation. It suggests
12//! using https://docs.rs/rayon/latest/rayon/.
13//!
14
15#![cfg_attr(feature = "unsafe_pointer", allow(dropping_references))]
16use super::complete_graph::CompleteGraph;
17use super::dual_module::*;
18use super::dual_module_serial::*;
19use super::pointers::*;
20use super::util::*;
21use super::visualize::*;
22use crate::rayon::prelude::*;
23use crate::serde_json;
24use crate::weak_table::PtrWeakHashSet;
25use serde::{Deserialize, Serialize};
26use std::collections::{BTreeSet, HashSet};
27use std::sync::{Arc, Weak};
28
29pub struct DualModuleParallel<SerialModule: DualModuleImpl + Send + Sync> {
30    /// the basic wrapped serial modules at the beginning, afterwards the fused units are appended after them
31    pub units: Vec<ArcManualSafeLock<DualModuleParallelUnit<SerialModule>>>,
32    /// local configuration
33    pub config: DualModuleParallelConfig,
34    /// partition information generated by the config
35    pub partition_info: Arc<PartitionInfo>,
36    /// thread pool used to execute async functions in parallel
37    pub thread_pool: Arc<rayon::ThreadPool>,
38    /// an empty sync requests queue just to implement the trait
39    pub empty_sync_request: Vec<SyncRequest>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(deny_unknown_fields)]
44pub struct DualModuleParallelConfig {
45    /// enable async execution of dual operations; only used when calling top-level operations, not used in individual units
46    #[serde(default = "dual_module_parallel_default_configs::thread_pool_size")]
47    pub thread_pool_size: usize,
48    /// strategy of edges placement: if edges are placed in the fusion unit, it's good for software implementation because there are no duplicate
49    /// edges and no unnecessary vertices in the descendant units. On the other hand, it's not very favorable if implemented on hardware: the
50    /// fusion unit usually contains a very small amount of vertices and edges for the interfacing between two blocks, but maintaining this small graph
51    /// may consume additional hardware resources and increase the decoding latency. I want the algorithm to finally work on the hardware efficiently
52    /// so I need to verify that it does work by holding all the fusion unit's owned vertices and edges in the descendants, although usually duplicated.
53    #[serde(default = "dual_module_parallel_default_configs::edges_in_fusion_unit")]
54    pub edges_in_fusion_unit: bool,
55    /// enable parallel execution of a fused dual module
56    #[serde(default = "dual_module_parallel_default_configs::enable_parallel_execution")]
57    pub enable_parallel_execution: bool,
58}
59
60impl Default for DualModuleParallelConfig {
61    fn default() -> Self {
62        serde_json::from_value(json!({})).unwrap()
63    }
64}
65
66pub mod dual_module_parallel_default_configs {
67    pub fn thread_pool_size() -> usize {
68        0
69    } // by default to the number of CPU cores
70      // pub fn thread_pool_size() -> usize { 1 }  // debug: use a single core
71    pub fn edges_in_fusion_unit() -> bool {
72        true
73    } // by default use the software-friendly approach because of removing duplicate edges
74    pub fn enable_parallel_execution() -> bool {
75        false
76    } // by default disabled: parallel execution may cause too much context switch, yet not much speed benefit
77}
78
79pub struct DualModuleParallelUnit<SerialModule: DualModuleImpl + Send + Sync> {
80    /// the index
81    pub unit_index: usize,
82    /// partition information generated by the config
83    pub partition_info: Arc<PartitionInfo>,
84    /// information shared with serial module
85    pub partition_unit: PartitionUnitPtr,
86    /// whether it's active or not; some units are "placeholder" units that are not active until they actually fuse their children
87    pub is_active: bool,
88    /// the vertex range of this parallel unit consists of all the owning_range of its descendants
89    pub whole_range: VertexRange,
90    /// the vertices owned by this unit, note that owning_range is a subset of whole_range
91    pub owning_range: VertexRange,
92    /// the vertices that are mirrored outside of whole_range, in order to propagate a vertex's sync event to every unit that mirrors it
93    pub extra_descendant_mirrored_vertices: HashSet<VertexIndex>,
94    /// the owned serial dual module
95    pub serial_module: SerialModule,
96    /// left and right children dual modules
97    pub children: Option<(
98        DualModuleParallelUnitWeak<SerialModule>,
99        DualModuleParallelUnitWeak<SerialModule>,
100    )>,
101    /// parent dual module
102    pub parent: Option<DualModuleParallelUnitWeak<SerialModule>>,
103    /// elevated dual nodes: whose descendent not on the representative path of a dual node
104    pub elevated_dual_nodes: PtrWeakHashSet<DualNodeWeak>,
105    /// an empty sync requests queue just to implement the trait
106    pub empty_sync_request: Vec<SyncRequest>,
107    /// run things in thread pool
108    pub enable_parallel_execution: bool,
109    /// whether any descendant unit has active dual node
110    pub has_active_node: bool,
111}
112
113pub type DualModuleParallelUnitPtr<SerialModule> = ArcManualSafeLock<DualModuleParallelUnit<SerialModule>>;
114pub type DualModuleParallelUnitWeak<SerialModule> = WeakManualSafeLock<DualModuleParallelUnit<SerialModule>>;
115
116impl<SerialModule: DualModuleImpl + Send + Sync> std::fmt::Debug for DualModuleParallelUnitPtr<SerialModule> {
117    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
118        let unit = self.read_recursive();
119        write!(f, "{}", unit.unit_index)
120    }
121}
122
123impl<SerialModule: DualModuleImpl + Send + Sync> std::fmt::Debug for DualModuleParallelUnitWeak<SerialModule> {
124    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
125        self.upgrade_force().fmt(f)
126    }
127}
128
129impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleParallel<SerialModule> {
130    /// recommended way to create a new instance, given a customized configuration
131    #[allow(clippy::unnecessary_cast)]
132    pub fn new_config(
133        initializer: &SolverInitializer,
134        partition_info: &PartitionInfo,
135        config: DualModuleParallelConfig,
136    ) -> Self {
137        let partition_info = Arc::new(partition_info.clone());
138        let mut thread_pool_builder = rayon::ThreadPoolBuilder::new();
139        if config.thread_pool_size != 0 {
140            thread_pool_builder = thread_pool_builder.num_threads(config.thread_pool_size);
141        }
142        let thread_pool = thread_pool_builder.build().expect("creating thread pool failed");
143        let mut units = vec![];
144        let unit_count = partition_info.units.len();
145        let complete_graph = CompleteGraph::new(initializer.vertex_num, &initializer.weighted_edges); // build the graph to construct the NN data structure
146        let mut contained_vertices_vec: Vec<BTreeSet<VertexIndex>> = vec![]; // all vertices maintained by each unit
147        let mut is_vertex_virtual: Vec<_> = (0..initializer.vertex_num).map(|_| false).collect();
148        for virtual_vertex in initializer.virtual_vertices.iter() {
149            is_vertex_virtual[*virtual_vertex as usize] = true;
150        }
151        let partition_units: Vec<PartitionUnitPtr> = (0..unit_count)
152            .map(|unit_index| {
153                PartitionUnitPtr::new_value(PartitionUnit {
154                    unit_index,
155                    enabled: unit_index < partition_info.config.partitions.len(),
156                })
157            })
158            .collect();
159        let mut partitioned_initializers: Vec<PartitionedSolverInitializer> = (0..unit_count)
160            .map(|unit_index| {
161                let mut interfaces = vec![];
162                let mut current_index = unit_index;
163                let owning_range = &partition_info.units[unit_index].owning_range;
164                let mut contained_vertices = BTreeSet::new();
165                for vertex_index in owning_range.iter() {
166                    contained_vertices.insert(vertex_index);
167                }
168                while let Some(parent_index) = &partition_info.units[current_index].parent {
169                    let mut mirror_vertices = vec![];
170                    if config.edges_in_fusion_unit {
171                        for vertex_index in partition_info.units[*parent_index].owning_range.iter() {
172                            let mut is_incident = false;
173                            for (peer_index, _) in complete_graph.vertices[vertex_index as usize].edges.iter() {
174                                if owning_range.contains(*peer_index) {
175                                    is_incident = true;
176                                    break;
177                                }
178                            }
179                            if is_incident {
180                                mirror_vertices.push((vertex_index, is_vertex_virtual[vertex_index as usize]));
181                                contained_vertices.insert(vertex_index);
182                            }
183                        }
184                    } else {
185                        // first check if there EXISTS any vertex that's adjacent of it's contains vertex
186                        let mut has_incident = false;
187                        for vertex_index in partition_info.units[*parent_index].owning_range.iter() {
188                            for (peer_index, _) in complete_graph.vertices[vertex_index as usize].edges.iter() {
189                                if contained_vertices.contains(peer_index) {
190                                    // important diff: as long as it has an edge with contained vertex, add it
191                                    has_incident = true;
192                                    break;
193                                }
194                            }
195                            if has_incident {
196                                break;
197                            }
198                        }
199                        if has_incident {
200                            // add all vertices as mirrored
201                            for vertex_index in partition_info.units[*parent_index].owning_range.iter() {
202                                mirror_vertices.push((vertex_index, is_vertex_virtual[vertex_index as usize]));
203                                contained_vertices.insert(vertex_index);
204                            }
205                        }
206                    }
207                    if !mirror_vertices.is_empty() {
208                        // only add non-empty mirrored parents is enough
209                        interfaces.push((partition_units[*parent_index].downgrade(), mirror_vertices));
210                    }
211                    current_index = *parent_index;
212                }
213                contained_vertices_vec.push(contained_vertices);
214                PartitionedSolverInitializer {
215                    unit_index,
216                    vertex_num: initializer.vertex_num,
217                    edge_num: initializer.weighted_edges.len(),
218                    owning_range: *owning_range,
219                    owning_interface: if unit_index < partition_info.config.partitions.len() {
220                        None
221                    } else {
222                        Some(partition_units[unit_index].downgrade())
223                    },
224                    weighted_edges: vec![], // to be filled later
225                    interfaces,
226                    virtual_vertices: owning_range
227                        .iter()
228                        .filter(|vertex_index| is_vertex_virtual[*vertex_index as usize])
229                        .collect(),
230                } // note that all fields can be modified later
231            })
232            .collect();
233        // assign each edge to its unique partition
234        for (edge_index, &(i, j, weight)) in initializer.weighted_edges.iter().enumerate() {
235            assert_ne!(i, j, "invalid edge from and to the same vertex {}", i);
236            assert!(
237                i < initializer.vertex_num,
238                "edge ({}, {}) connected to an invalid vertex {}",
239                i,
240                j,
241                i
242            );
243            assert!(
244                j < initializer.vertex_num,
245                "edge ({}, {}) connected to an invalid vertex {}",
246                i,
247                j,
248                j
249            );
250            let i_unit_index = partition_info.vertex_to_owning_unit[i as usize];
251            let j_unit_index = partition_info.vertex_to_owning_unit[j as usize];
252            // either left is ancestor of right or right is ancestor of left, otherwise the edge is invalid (because crossing two independent partitions)
253            let is_i_ancestor = partition_info.units[i_unit_index].descendants.contains(&j_unit_index);
254            let is_j_ancestor = partition_info.units[j_unit_index].descendants.contains(&i_unit_index);
255            assert!(
256                is_i_ancestor || is_j_ancestor || i_unit_index == j_unit_index,
257                "violating edge ({}, {}) crossing two independent partitions {} and {}",
258                i,
259                j,
260                i_unit_index,
261                j_unit_index
262            );
263            let ancestor_unit_index = if is_i_ancestor { i_unit_index } else { j_unit_index };
264            let descendant_unit_index = if is_i_ancestor { j_unit_index } else { i_unit_index };
265            if config.edges_in_fusion_unit {
266                // the edge should be added to the descendant, and it's guaranteed that the descendant unit contains (although not necessarily owned) the vertex
267                partitioned_initializers[descendant_unit_index]
268                    .weighted_edges
269                    .push((i, j, weight, edge_index as EdgeIndex));
270            } else {
271                // add edge to every unit from the descendant (including) and the ancestor (excluding) who mirrored the vertex
272                if ancestor_unit_index < partition_info.config.partitions.len() {
273                    // leaf unit holds every unit
274                    partitioned_initializers[descendant_unit_index].weighted_edges.push((
275                        i,
276                        j,
277                        weight,
278                        edge_index as EdgeIndex,
279                    ));
280                } else {
281                    // iterate every leaf unit of the `descendant_unit_index` to see if adding the edge or not
282                    struct DfsInfo<'a> {
283                        partition_config: &'a PartitionConfig,
284                        partition_info: &'a PartitionInfo,
285                        i: VertexIndex,
286                        j: VertexIndex,
287                        weight: Weight,
288                        contained_vertices_vec: &'a Vec<BTreeSet<VertexIndex>>,
289                        edge_index: EdgeIndex,
290                    }
291                    let dfs_info = DfsInfo {
292                        partition_config: &partition_info.config,
293                        partition_info: &partition_info,
294                        i,
295                        j,
296                        weight,
297                        contained_vertices_vec: &contained_vertices_vec,
298                        edge_index: edge_index as EdgeIndex,
299                    };
300                    fn dfs_add(
301                        unit_index: usize,
302                        dfs_info: &DfsInfo,
303                        partitioned_initializers: &mut Vec<PartitionedSolverInitializer>,
304                    ) {
305                        if unit_index >= dfs_info.partition_config.partitions.len() {
306                            let (left_index, right_index) = &dfs_info.partition_info.units[unit_index]
307                                .children
308                                .expect("fusion unit must have children");
309                            dfs_add(*left_index, dfs_info, partitioned_initializers);
310                            dfs_add(*right_index, dfs_info, partitioned_initializers);
311                        } else {
312                            let contain_i = dfs_info.contained_vertices_vec[unit_index].contains(&dfs_info.i);
313                            let contain_j = dfs_info.contained_vertices_vec[unit_index].contains(&dfs_info.j);
314                            assert!(
315                                !(contain_i ^ contain_j),
316                                "{} and {} must either be both contained or not contained by {}",
317                                dfs_info.i,
318                                dfs_info.j,
319                                unit_index
320                            );
321                            if contain_i {
322                                partitioned_initializers[unit_index].weighted_edges.push((
323                                    dfs_info.i,
324                                    dfs_info.j,
325                                    dfs_info.weight,
326                                    dfs_info.edge_index,
327                                ));
328                            }
329                        }
330                    }
331                    dfs_add(descendant_unit_index, &dfs_info, &mut partitioned_initializers);
332                }
333            }
334        }
335        // println!("partitioned_initializers: {:?}", partitioned_initializers);
336        thread_pool.scope(|_| {
337            (0..unit_count)
338                .into_par_iter()
339                .map(|unit_index| {
340                    // println!("unit_index: {unit_index}");
341                    let dual_module = SerialModule::new_partitioned(&partitioned_initializers[unit_index]);
342                    DualModuleParallelUnitPtr::new_wrapper(
343                        dual_module,
344                        unit_index,
345                        Arc::clone(&partition_info),
346                        partition_units[unit_index].clone(),
347                        config.enable_parallel_execution,
348                    )
349                })
350                .collect_into_vec(&mut units);
351        });
352        // fill in the children and parent references
353        for unit_index in 0..unit_count {
354            let mut unit = units[unit_index].write();
355            if let Some((left_children_index, right_children_index)) = &partition_info.units[unit_index].children {
356                unit.children = Some((
357                    units[*left_children_index].downgrade(),
358                    units[*right_children_index].downgrade(),
359                ))
360            }
361            if let Some(parent_index) = &partition_info.units[unit_index].parent {
362                unit.parent = Some(units[*parent_index].downgrade());
363            }
364        }
365        // fill in the extra_descendant_mirrored_vertices
366        for unit_index in 0..unit_count {
367            lock_write!(unit, units[unit_index]);
368            let whole_range = &partition_info.units[unit_index].whole_range;
369            let partitioned_initializer = &partitioned_initializers[unit_index];
370            for (_, interface_vertices) in partitioned_initializer.interfaces.iter() {
371                for (vertex_index, _) in interface_vertices.iter() {
372                    if !whole_range.contains(*vertex_index) {
373                        unit.extra_descendant_mirrored_vertices.insert(*vertex_index);
374                    }
375                }
376            }
377            if let Some((left_children_weak, right_children_weak)) = unit.children.clone() {
378                for child_weak in [left_children_weak, right_children_weak] {
379                    // note: although iterating over HashSet is not performance optimal, this only happens at initialization and thus it's fine
380                    for vertex_index in child_weak
381                        .upgrade_force()
382                        .read_recursive()
383                        .extra_descendant_mirrored_vertices
384                        .iter()
385                    {
386                        if !whole_range.contains(*vertex_index) {
387                            unit.extra_descendant_mirrored_vertices.insert(*vertex_index);
388                        }
389                    }
390                }
391            }
392            // println!("{} extra_descendant_mirrored_vertices: {:?}", unit.unit_index, unit.extra_descendant_mirrored_vertices);
393        }
394        Self {
395            units,
396            config,
397            partition_info,
398            thread_pool: Arc::new(thread_pool),
399            empty_sync_request: vec![],
400        }
401    }
402
403    /// find the active ancestor to handle this dual node (should be unique, i.e. any time only one ancestor is active)
404    #[inline(never)]
405    pub fn find_active_ancestor(&self, dual_node_ptr: &DualNodePtr) -> DualModuleParallelUnitPtr<SerialModule> {
406        self.find_active_ancestor_option(dual_node_ptr).unwrap()
407    }
408
409    #[allow(clippy::unnecessary_cast)]
410    pub fn find_active_ancestor_option(
411        &self,
412        dual_node_ptr: &DualNodePtr,
413    ) -> Option<DualModuleParallelUnitPtr<SerialModule>> {
414        // find the first active ancestor unit that should handle this dual node
415        let representative_vertex = dual_node_ptr.get_representative_vertex();
416        let owning_unit_index = self.partition_info.vertex_to_owning_unit[representative_vertex as usize];
417        let mut owning_unit_ptr = self.units[owning_unit_index].clone();
418        loop {
419            let owning_unit = owning_unit_ptr.read_recursive();
420            if owning_unit.is_active {
421                break; // find an active unit
422            }
423            if let Some(parent_weak) = &owning_unit.parent {
424                let parent_owning_unit_ptr = parent_weak.upgrade_force();
425                drop(owning_unit);
426                owning_unit_ptr = parent_owning_unit_ptr;
427            } else {
428                return None;
429            }
430        }
431        Some(owning_unit_ptr)
432    }
433
434    /// statically fuse them all, may be called at any state (meaning each unit may not necessarily be solved locally)
435    pub fn static_fuse_all(&mut self) {
436        for unit_ptr in self.units.iter() {
437            lock_write!(unit, unit_ptr);
438            if let Some((left_child_weak, right_child_weak)) = &unit.children {
439                {
440                    // ignore already fused children and work on others
441                    let left_child_ptr = left_child_weak.upgrade_force();
442                    let right_child_ptr = right_child_weak.upgrade_force();
443                    let left_child = left_child_ptr.read_recursive();
444                    let right_child = right_child_ptr.read_recursive();
445                    if !left_child.is_active && !right_child.is_active {
446                        continue; // already fused, it's ok to just ignore
447                    }
448                    debug_assert!(
449                        left_child.is_active && right_child.is_active,
450                        "children must be active at the same time if fusing all together"
451                    );
452                }
453                unit.static_fuse();
454            }
455        }
456    }
457}
458
459impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleImpl for DualModuleParallel<SerialModule> {
460    /// initialize the dual module, which is supposed to be reused for multiple decoding tasks with the same structure
461    fn new_empty(initializer: &SolverInitializer) -> Self {
462        Self::new_config(
463            initializer,
464            &PartitionConfig::new(initializer.vertex_num).info(),
465            DualModuleParallelConfig::default(),
466        )
467    }
468
469    /// clear all growth and existing dual nodes
470    #[inline(never)]
471    fn clear(&mut self) {
472        self.thread_pool.scope(|_| {
473            self.units.par_iter().enumerate().for_each(|(unit_idx, unit_ptr)| {
474                lock_write!(unit, unit_ptr);
475                unit.clear();
476                unit.is_active = unit_idx < self.partition_info.config.partitions.len(); // only partitioned serial modules are active at the beginning
477                unit.partition_unit.write().enabled = false;
478                unit.elevated_dual_nodes.clear();
479            });
480        })
481    }
482
483    // although not the intended way to use it, we do support these common APIs for compatibility with normal primal modules
484
485    fn add_dual_node(&mut self, dual_node_ptr: &DualNodePtr) {
486        let unit_ptr = self.find_active_ancestor(dual_node_ptr);
487        self.thread_pool.scope(|_| {
488            lock_write!(unit, unit_ptr);
489            unit.add_dual_node(dual_node_ptr);
490        })
491    }
492
493    fn remove_blossom(&mut self, dual_node_ptr: DualNodePtr) {
494        let unit_ptr = self.find_active_ancestor(&dual_node_ptr);
495        self.thread_pool.scope(|_| {
496            lock_write!(unit, unit_ptr);
497            unit.remove_blossom(dual_node_ptr);
498        })
499    }
500
501    fn set_grow_state(&mut self, dual_node_ptr: &DualNodePtr, grow_state: DualNodeGrowState) {
502        let unit_ptr = self.find_active_ancestor(dual_node_ptr);
503        self.thread_pool.scope(|_| {
504            lock_write!(unit, unit_ptr);
505            unit.set_grow_state(dual_node_ptr, grow_state);
506        })
507    }
508
509    fn compute_maximum_update_length_dual_node(
510        &mut self,
511        dual_node_ptr: &DualNodePtr,
512        is_grow: bool,
513        simultaneous_update: bool,
514    ) -> MaxUpdateLength {
515        let unit_ptr = self.find_active_ancestor(dual_node_ptr);
516        self.thread_pool.scope(|_| {
517            lock_write!(unit, unit_ptr);
518            unit.compute_maximum_update_length_dual_node(dual_node_ptr, is_grow, simultaneous_update)
519        })
520    }
521
522    fn compute_maximum_update_length(&mut self) -> GroupMaxUpdateLength {
523        self.thread_pool.scope(|_| {
524            let results: Vec<_> = self
525                .units
526                .par_iter()
527                .filter_map(|unit_ptr| {
528                    lock_write!(unit, unit_ptr);
529                    if !unit.is_active {
530                        return None;
531                    }
532                    Some(unit.compute_maximum_update_length())
533                })
534                .collect();
535            let mut group_max_update_length = GroupMaxUpdateLength::new();
536            for local_group_max_update_length in results.into_iter() {
537                group_max_update_length.extend(local_group_max_update_length);
538            }
539            group_max_update_length
540        })
541    }
542
543    fn grow_dual_node(&mut self, dual_node_ptr: &DualNodePtr, length: Weight) {
544        let unit_ptr = self.find_active_ancestor(dual_node_ptr);
545        self.thread_pool.scope(|_| {
546            lock_write!(unit, unit_ptr);
547            unit.grow_dual_node(dual_node_ptr, length);
548        })
549    }
550
551    fn grow(&mut self, length: Weight) {
552        self.thread_pool.scope(|_| {
553            self.units.par_iter().for_each(|unit_ptr| {
554                lock_write!(unit, unit_ptr);
555                if !unit.is_active {
556                    return;
557                }
558                unit.grow(length);
559            });
560        })
561    }
562
563    fn load_edge_modifier(&mut self, edge_modifier: &[(EdgeIndex, Weight)]) {
564        self.thread_pool.scope(|_| {
565            self.units.par_iter().for_each(|unit_ptr| {
566                lock_write!(unit, unit_ptr);
567                if !unit.is_active {
568                    return;
569                }
570                unit.load_edge_modifier(edge_modifier);
571            });
572        })
573    }
574
575    fn prepare_nodes_shrink(&mut self, nodes_circle: &[DualNodePtr]) -> &mut Vec<SyncRequest> {
576        let unit_ptr = self.find_active_ancestor(&nodes_circle[0]);
577        self.thread_pool.scope(|_| {
578            lock_write!(unit, unit_ptr);
579            unit.prepare_nodes_shrink(nodes_circle);
580        });
581        &mut self.empty_sync_request
582    }
583}
584
585impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleParallelImpl for DualModuleParallel<SerialModule> {
586    type UnitType = DualModuleParallelUnit<SerialModule>;
587
588    fn get_unit(&self, unit_index: usize) -> ArcManualSafeLock<Self::UnitType> {
589        self.units[unit_index].clone()
590    }
591}
592
593/*
594Implementing visualization functions
595*/
596
597impl<SerialModule: DualModuleImpl + FusionVisualizer + Send + Sync> FusionVisualizer for DualModuleParallel<SerialModule> {
598    fn snapshot(&self, abbrev: bool) -> serde_json::Value {
599        // do the sanity check first before taking snapshot
600        // self.sanity_check().unwrap();
601        let mut value = json!({});
602        for unit_ptr in self.units.iter() {
603            let unit = unit_ptr.read_recursive();
604            if !unit.is_active {
605                continue;
606            } // do not visualize inactive units
607            let value_2 = unit.snapshot(abbrev);
608            snapshot_combine_values(&mut value, value_2, abbrev);
609        }
610        value
611    }
612}
613
614impl<SerialModule: DualModuleImpl + FusionVisualizer + Send + Sync> FusionVisualizer
615    for DualModuleParallelUnit<SerialModule>
616{
617    fn snapshot(&self, abbrev: bool) -> serde_json::Value {
618        let mut value = self.serial_module.snapshot(abbrev);
619        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
620            snapshot_combine_values(
621                &mut value,
622                left_child_weak.upgrade_force().read_recursive().snapshot(abbrev),
623                abbrev,
624            );
625            snapshot_combine_values(
626                &mut value,
627                right_child_weak.upgrade_force().read_recursive().snapshot(abbrev),
628                abbrev,
629            );
630        }
631        value
632    }
633}
634
635impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleParallelUnit<SerialModule> {
636    /// statically fuse the children of this unit
637    pub fn static_fuse(&mut self) {
638        debug_assert!(!self.is_active, "cannot fuse the child an already active unit");
639        let (left_child_ptr, right_child_ptr) = (
640            self.children.as_ref().unwrap().0.upgrade_force(),
641            self.children.as_ref().unwrap().1.upgrade_force(),
642        );
643        let mut left_child = left_child_ptr.write();
644        let mut right_child = right_child_ptr.write();
645        debug_assert!(left_child.is_active && right_child.is_active, "cannot fuse inactive pairs");
646        // update active state
647        self.is_active = true;
648        left_child.is_active = false;
649        right_child.is_active = false;
650        // set partition unit as enabled
651        let mut partition_unit = self.partition_unit.write();
652        partition_unit.enabled = true;
653    }
654
655    /// fuse the children of this unit and also fuse the interfaces of them
656    pub fn fuse(
657        &mut self,
658        parent_interface: &DualModuleInterfacePtr,
659        children_interfaces: (&DualModuleInterfacePtr, &DualModuleInterfacePtr),
660    ) {
661        self.static_fuse();
662        let (left_interface, right_interface) = children_interfaces;
663        let right_child_ptr = self.children.as_ref().unwrap().1.upgrade_force();
664        lock_write!(right_child, right_child_ptr);
665        // change the index of dual nodes in the right children
666        let bias = left_interface.read_recursive().nodes_count();
667        right_child.iterative_bias_dual_node_index(bias);
668        parent_interface.fuse(left_interface, right_interface);
669    }
670
671    pub fn iterative_bias_dual_node_index(&mut self, bias: NodeIndex) {
672        // depth-first search
673        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
674            if self.enable_parallel_execution {
675                rayon::join(
676                    || {
677                        left_child_weak.upgrade_force().write().iterative_bias_dual_node_index(bias);
678                    },
679                    || {
680                        right_child_weak.upgrade_force().write().iterative_bias_dual_node_index(bias);
681                    },
682                );
683            } else {
684                left_child_weak.upgrade_force().write().iterative_bias_dual_node_index(bias);
685                right_child_weak.upgrade_force().write().iterative_bias_dual_node_index(bias);
686            }
687        }
688        // my serial module
689        self.serial_module.bias_dual_node_index(bias);
690    }
691
692    /// if any descendant unit mirror or own the vertex
693    pub fn is_vertex_in_descendant(&self, vertex_index: VertexIndex) -> bool {
694        self.whole_range.contains(vertex_index) || self.extra_descendant_mirrored_vertices.contains(&vertex_index)
695    }
696
697    /// no need to deduplicate the events: the result will always be consistent with the last one
698    fn execute_sync_events(&mut self, sync_requests: &[SyncRequest]) {
699        // println!("sync_requests: {sync_requests:?}");
700        for sync_request in sync_requests.iter() {
701            sync_request.update();
702            self.execute_sync_event(sync_request);
703        }
704    }
705
706    /// iteratively prepare all growing and shrinking and append the sync requests
707    fn iterative_prepare_all(&mut self, sync_requests: &mut Vec<SyncRequest>) {
708        if !self.has_active_node {
709            return; // early return to avoid going through all units
710        }
711        // depth-first search
712        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
713            if self.enable_parallel_execution {
714                let mut sync_requests_2 = vec![];
715                rayon::join(
716                    || {
717                        left_child_weak.upgrade_force().write().iterative_prepare_all(sync_requests);
718                    },
719                    || {
720                        right_child_weak
721                            .upgrade_force()
722                            .write()
723                            .iterative_prepare_all(&mut sync_requests_2);
724                    },
725                );
726                sync_requests.append(&mut sync_requests_2);
727            } else {
728                left_child_weak.upgrade_force().write().iterative_prepare_all(sync_requests);
729                right_child_weak.upgrade_force().write().iterative_prepare_all(sync_requests);
730            }
731        }
732        // my serial module
733        let local_sync_requests = self.serial_module.prepare_all();
734        sync_requests.append(local_sync_requests);
735    }
736
737    /// iteratively set grow state
738    fn iterative_set_grow_state(
739        &mut self,
740        dual_node_ptr: &DualNodePtr,
741        grow_state: DualNodeGrowState,
742        representative_vertex: VertexIndex,
743    ) {
744        if !self.whole_range.contains(representative_vertex) && !self.elevated_dual_nodes.contains(dual_node_ptr) {
745            return; // no descendant related to this dual node
746        }
747        if grow_state != DualNodeGrowState::Stay {
748            self.has_active_node = true;
749        }
750        // depth-first search
751        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
752            left_child_weak.upgrade_force().write().iterative_set_grow_state(
753                dual_node_ptr,
754                grow_state,
755                representative_vertex,
756            );
757            right_child_weak.upgrade_force().write().iterative_set_grow_state(
758                dual_node_ptr,
759                grow_state,
760                representative_vertex,
761            );
762        }
763        if self.owning_range.contains(representative_vertex) || self.serial_module.contains_dual_node(dual_node_ptr) {
764            self.serial_module.set_grow_state(dual_node_ptr, grow_state);
765        }
766    }
767
768    /// check if elevated_dual_nodes contains any dual node in the list
769    pub fn elevated_dual_nodes_contains_any(&self, nodes: &[DualNodePtr]) -> bool {
770        for node_ptr in nodes.iter() {
771            if self.elevated_dual_nodes.contains(node_ptr) {
772                return true;
773            }
774        }
775        false
776    }
777
778    /// prepare the initial shrink of a blossom
779    fn iterative_prepare_nodes_shrink(
780        &mut self,
781        nodes_circle: &[DualNodePtr],
782        nodes_circle_vertices: &[VertexIndex],
783        sync_requests: &mut Vec<SyncRequest>,
784    ) {
785        if !self.whole_range.contains_any(nodes_circle_vertices) && !self.elevated_dual_nodes_contains_any(nodes_circle) {
786            return; // no descendant related to this dual node
787        }
788        self.has_active_node = true;
789        // depth-first search
790        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
791            if self.enable_parallel_execution {
792                let mut sync_requests_2 = vec![];
793                rayon::join(
794                    || {
795                        left_child_weak.upgrade_force().write().iterative_prepare_nodes_shrink(
796                            nodes_circle,
797                            nodes_circle_vertices,
798                            sync_requests,
799                        );
800                    },
801                    || {
802                        right_child_weak.upgrade_force().write().iterative_prepare_nodes_shrink(
803                            nodes_circle,
804                            nodes_circle_vertices,
805                            &mut sync_requests_2,
806                        );
807                    },
808                );
809                sync_requests.append(&mut sync_requests_2);
810            } else {
811                left_child_weak.upgrade_force().write().iterative_prepare_nodes_shrink(
812                    nodes_circle,
813                    nodes_circle_vertices,
814                    sync_requests,
815                );
816                right_child_weak.upgrade_force().write().iterative_prepare_nodes_shrink(
817                    nodes_circle,
818                    nodes_circle_vertices,
819                    sync_requests,
820                );
821            }
822        }
823        let local_sync_requests = self.serial_module.prepare_nodes_shrink(nodes_circle);
824        sync_requests.append(local_sync_requests);
825    }
826
827    fn iterative_add_blossom(
828        &mut self,
829        blossom_ptr: &DualNodePtr,
830        nodes_circle: &[DualNodePtr],
831        representative_vertex: VertexIndex,
832        nodes_circle_vertices: &[VertexIndex],
833    ) {
834        if !self.whole_range.contains_any(nodes_circle_vertices) && !self.elevated_dual_nodes_contains_any(nodes_circle) {
835            return; // no descendant related to this dual node
836        }
837        self.has_active_node = true;
838        // depth-first search
839        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
840            if self.enable_parallel_execution {
841                rayon::join(
842                    || {
843                        left_child_weak.upgrade_force().write().iterative_add_blossom(
844                            blossom_ptr,
845                            nodes_circle,
846                            representative_vertex,
847                            nodes_circle_vertices,
848                        );
849                    },
850                    || {
851                        right_child_weak.upgrade_force().write().iterative_add_blossom(
852                            blossom_ptr,
853                            nodes_circle,
854                            representative_vertex,
855                            nodes_circle_vertices,
856                        );
857                    },
858                );
859            } else {
860                left_child_weak.upgrade_force().write().iterative_add_blossom(
861                    blossom_ptr,
862                    nodes_circle,
863                    representative_vertex,
864                    nodes_circle_vertices,
865                );
866                right_child_weak.upgrade_force().write().iterative_add_blossom(
867                    blossom_ptr,
868                    nodes_circle,
869                    representative_vertex,
870                    nodes_circle_vertices,
871                );
872            }
873        }
874        if self.owning_range.contains_any(nodes_circle_vertices) || self.serial_module.contains_dual_nodes_any(nodes_circle)
875        {
876            self.serial_module.add_blossom(blossom_ptr);
877        }
878        // if I'm not on the representative path of this dual node, I need to register the propagated_dual_node
879        // note that I don't need to register propagated_grandson_dual_node because it's never gonna grow inside the blossom
880        if !self.whole_range.contains(representative_vertex) {
881            self.elevated_dual_nodes.insert(blossom_ptr.clone());
882        }
883    }
884
885    fn iterative_add_defect_node(&mut self, dual_node_ptr: &DualNodePtr, vertex_index: VertexIndex) {
886        // if the vertex is not hold by any descendant, simply return
887        if !self.is_vertex_in_descendant(vertex_index) {
888            return;
889        }
890        self.has_active_node = true;
891        // println!("sync_prepare_growth_update_sync_event: vertex {}, unit index {}", sync_event.vertex_index, self.unit_index);
892        // depth-first search
893        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
894            if self.enable_parallel_execution {
895                rayon::join(
896                    || {
897                        left_child_weak
898                            .upgrade_force()
899                            .write()
900                            .iterative_add_defect_node(dual_node_ptr, vertex_index);
901                    },
902                    || {
903                        right_child_weak
904                            .upgrade_force()
905                            .write()
906                            .iterative_add_defect_node(dual_node_ptr, vertex_index);
907                    },
908                );
909            } else {
910                left_child_weak
911                    .upgrade_force()
912                    .write()
913                    .iterative_add_defect_node(dual_node_ptr, vertex_index);
914                right_child_weak
915                    .upgrade_force()
916                    .write()
917                    .iterative_add_defect_node(dual_node_ptr, vertex_index);
918            }
919        }
920        // update on my serial module
921        if self.serial_module.contains_vertex(vertex_index) {
922            self.serial_module.add_defect_node(dual_node_ptr);
923        }
924        // if I'm not on the representative path of this dual node, I need to register the propagated_dual_node
925        // note that I don't need to register propagated_grandson_dual_node because it's never gonna grow inside the blossom
926        if !self.whole_range.contains(vertex_index) {
927            self.elevated_dual_nodes.insert(dual_node_ptr.clone());
928        }
929    }
930
931    fn iterative_compute_maximum_update_length(&mut self, group_max_update_length: &mut GroupMaxUpdateLength) -> bool {
932        // early terminate if no active dual nodes anywhere in the descendant
933        if !self.has_active_node {
934            return false;
935        }
936        let serial_module_group_max_update_length = self.serial_module.compute_maximum_update_length();
937        if !serial_module_group_max_update_length.is_active() {
938            self.has_active_node = false;
939        }
940        group_max_update_length.extend(serial_module_group_max_update_length);
941        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
942            let (left_child_has_active_node, right_child_has_active_node) = if self.enable_parallel_execution {
943                let mut group_max_update_length_2 = GroupMaxUpdateLength::new();
944                let (left_child_has_active_node, right_child_has_active_node) = rayon::join(
945                    || {
946                        left_child_weak
947                            .upgrade_force()
948                            .write()
949                            .iterative_compute_maximum_update_length(group_max_update_length)
950                    },
951                    || {
952                        right_child_weak
953                            .upgrade_force()
954                            .write()
955                            .iterative_compute_maximum_update_length(&mut group_max_update_length_2)
956                    },
957                );
958                group_max_update_length.extend(group_max_update_length_2);
959                (left_child_has_active_node, right_child_has_active_node)
960            } else {
961                (
962                    left_child_weak
963                        .upgrade_force()
964                        .write()
965                        .iterative_compute_maximum_update_length(group_max_update_length),
966                    right_child_weak
967                        .upgrade_force()
968                        .write()
969                        .iterative_compute_maximum_update_length(group_max_update_length),
970                )
971            };
972            if left_child_has_active_node || right_child_has_active_node {
973                self.has_active_node = true
974            }
975        }
976        self.has_active_node
977    }
978
979    fn iterative_grow_dual_node(&mut self, dual_node_ptr: &DualNodePtr, length: Weight, representative_vertex: VertexIndex) {
980        if !self.whole_range.contains(representative_vertex) && !self.elevated_dual_nodes.contains(dual_node_ptr) {
981            return; // no descendant related to this dual node
982        }
983        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
984            if self.enable_parallel_execution {
985                rayon::join(
986                    || {
987                        left_child_weak.upgrade_force().write().iterative_grow_dual_node(
988                            dual_node_ptr,
989                            length,
990                            representative_vertex,
991                        );
992                    },
993                    || {
994                        right_child_weak.upgrade_force().write().iterative_grow_dual_node(
995                            dual_node_ptr,
996                            length,
997                            representative_vertex,
998                        );
999                    },
1000                );
1001            } else {
1002                left_child_weak.upgrade_force().write().iterative_grow_dual_node(
1003                    dual_node_ptr,
1004                    length,
1005                    representative_vertex,
1006                );
1007                right_child_weak.upgrade_force().write().iterative_grow_dual_node(
1008                    dual_node_ptr,
1009                    length,
1010                    representative_vertex,
1011                );
1012            }
1013        }
1014        if self.owning_range.contains(representative_vertex) || self.serial_module.contains_dual_node(dual_node_ptr) {
1015            self.serial_module.grow_dual_node(dual_node_ptr, length);
1016        }
1017    }
1018
1019    fn iterative_grow(&mut self, length: Weight) {
1020        // early terminate if no active dual nodes anywhere in the descendant
1021        if !self.has_active_node {
1022            return;
1023        }
1024        self.serial_module.grow(length);
1025        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
1026            if self.enable_parallel_execution {
1027                rayon::join(
1028                    || {
1029                        left_child_weak.upgrade_force().write().iterative_grow(length);
1030                    },
1031                    || {
1032                        right_child_weak.upgrade_force().write().iterative_grow(length);
1033                    },
1034                );
1035            } else {
1036                left_child_weak.upgrade_force().write().iterative_grow(length);
1037                right_child_weak.upgrade_force().write().iterative_grow(length);
1038            }
1039        }
1040    }
1041
1042    fn iterative_remove_blossom(&mut self, dual_node_ptr: &DualNodePtr, representative_vertex: VertexIndex) {
1043        if !self.whole_range.contains(representative_vertex) && !self.elevated_dual_nodes.contains(dual_node_ptr) {
1044            return; // no descendant related to this dual node
1045        }
1046        self.has_active_node = true;
1047        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
1048            if self.enable_parallel_execution {
1049                rayon::join(
1050                    || {
1051                        left_child_weak
1052                            .upgrade_force()
1053                            .write()
1054                            .iterative_remove_blossom(dual_node_ptr, representative_vertex);
1055                    },
1056                    || {
1057                        right_child_weak
1058                            .upgrade_force()
1059                            .write()
1060                            .iterative_remove_blossom(dual_node_ptr, representative_vertex);
1061                    },
1062                );
1063            } else {
1064                left_child_weak
1065                    .upgrade_force()
1066                    .write()
1067                    .iterative_remove_blossom(dual_node_ptr, representative_vertex);
1068                right_child_weak
1069                    .upgrade_force()
1070                    .write()
1071                    .iterative_remove_blossom(dual_node_ptr, representative_vertex);
1072            }
1073        }
1074        if self.owning_range.contains(representative_vertex) || self.serial_module.contains_dual_node(dual_node_ptr) {
1075            self.serial_module.remove_blossom(dual_node_ptr.clone());
1076        }
1077    }
1078}
1079
1080impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleParallelUnitPtr<SerialModule> {
1081    /// create a simple wrapper over a serial dual module
1082    pub fn new_wrapper(
1083        serial_module: SerialModule,
1084        unit_index: usize,
1085        partition_info: Arc<PartitionInfo>,
1086        partition_unit: PartitionUnitPtr,
1087        enable_parallel_execution: bool,
1088    ) -> Self {
1089        let partition_unit_info = &partition_info.units[unit_index];
1090        Self::new_value(DualModuleParallelUnit {
1091            unit_index,
1092            partition_info: partition_info.clone(),
1093            partition_unit,
1094            is_active: partition_unit_info.children.is_none(), // only activate the leaves in the dependency tree
1095            whole_range: partition_unit_info.whole_range,
1096            owning_range: partition_unit_info.owning_range,
1097            extra_descendant_mirrored_vertices: HashSet::new(), // to be filled later
1098            serial_module,
1099            children: None, // to be filled later
1100            parent: None,   // to be filled later
1101            elevated_dual_nodes: PtrWeakHashSet::new(),
1102            empty_sync_request: vec![],
1103            enable_parallel_execution,
1104            has_active_node: true, // by default to true, because children may have active nodes
1105        })
1106    }
1107}
1108
1109/// We cannot implement async function because a RwLockWriteGuard implements !Send
1110impl<SerialModule: DualModuleImpl + Send + Sync> DualModuleImpl for DualModuleParallelUnit<SerialModule> {
1111    /// clear all growth and existing dual nodes
1112    fn new_empty(_initializer: &SolverInitializer) -> Self {
1113        panic!("creating parallel unit directly from initializer is forbidden, use `DualModuleParallel::new` instead");
1114    }
1115
1116    /// clear all growth and existing dual nodes
1117    fn clear(&mut self) {
1118        self.has_active_node = true;
1119        self.serial_module.clear()
1120    }
1121
1122    /// add a new dual node from dual module root
1123    fn add_dual_node(&mut self, dual_node_ptr: &DualNodePtr) {
1124        self.has_active_node = true;
1125        let representative_vertex = dual_node_ptr.get_representative_vertex();
1126        match &dual_node_ptr.read_recursive().class {
1127            // fast path: if dual node is a single vertex, then only add to the owning node; single vertex dual node can only add when dual variable = 0
1128            DualNodeClass::DefectVertex { defect_index } => {
1129                if self.owning_range.contains(representative_vertex) {
1130                    // fast path: the most common one
1131                    self.iterative_add_defect_node(dual_node_ptr, *defect_index);
1132                } else {
1133                    // find the one that owns it and add the dual node, and then add the serial_module
1134                    if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
1135                        let mut child_ptr = if representative_vertex < self.owning_range.start() {
1136                            left_child_weak.upgrade_force()
1137                        } else {
1138                            right_child_weak.upgrade_force()
1139                        };
1140                        let mut is_owning_dual_node = false;
1141                        while !is_owning_dual_node {
1142                            let mut child = child_ptr.write();
1143                            child.has_active_node = true;
1144                            debug_assert!(
1145                                child.whole_range.contains(representative_vertex),
1146                                "selected child must contains the vertex"
1147                            );
1148                            is_owning_dual_node = child.owning_range.contains(representative_vertex);
1149                            if !is_owning_dual_node {
1150                                // search for the grandsons
1151                                let grandson_ptr = if let Some((left_child_weak, right_child_weak)) = child.children.as_ref()
1152                                {
1153                                    if representative_vertex < child.owning_range.start() {
1154                                        left_child_weak.upgrade_force()
1155                                    } else {
1156                                        right_child_weak.upgrade_force()
1157                                    }
1158                                } else {
1159                                    unreachable!()
1160                                };
1161                                drop(child);
1162                                child_ptr = grandson_ptr;
1163                            }
1164                        }
1165                        lock_write!(child, child_ptr);
1166                        child.iterative_add_defect_node(dual_node_ptr, *defect_index);
1167                    } else {
1168                        unreachable!()
1169                    }
1170                }
1171                // if it's children mirrors this vertex as well, then it's necessary to add this dual node to those children as well
1172            }
1173            // this is a blossom, meaning it's children dual nodes may reside on any path
1174            DualNodeClass::Blossom { nodes_circle, .. } => {
1175                // first set all children dual nodes as shrinking, to be safe
1176                let nodes_circle_ptrs: Vec<_> = nodes_circle.iter().map(|weak| weak.upgrade_force()).collect();
1177                let nodes_circle_vertices: Vec<_> = nodes_circle
1178                    .iter()
1179                    .map(|weak| weak.upgrade_force().get_representative_vertex())
1180                    .collect();
1181                self.prepare_nodes_shrink(&nodes_circle_ptrs);
1182                self.iterative_add_blossom(
1183                    dual_node_ptr,
1184                    &nodes_circle_ptrs,
1185                    representative_vertex,
1186                    &nodes_circle_vertices,
1187                );
1188            }
1189        }
1190    }
1191
1192    fn remove_blossom(&mut self, dual_node_ptr: DualNodePtr) {
1193        let representative_vertex = dual_node_ptr.get_representative_vertex();
1194        self.iterative_remove_blossom(&dual_node_ptr, representative_vertex);
1195    }
1196
1197    fn set_grow_state(&mut self, dual_node_ptr: &DualNodePtr, grow_state: DualNodeGrowState) {
1198        // println!("unit {} set_grow_state {:?} {:?}", self.unit_index, dual_node_ptr, grow_state);
1199        // find the path towards the owning unit of this dual node, and also try paths towards the elevated
1200        let representative_vertex = dual_node_ptr.get_representative_vertex();
1201        debug_assert!(
1202            self.whole_range.contains(representative_vertex),
1203            "cannot set growth state of dual node outside of the scope"
1204        );
1205        self.iterative_set_grow_state(dual_node_ptr, grow_state, representative_vertex);
1206    }
1207
1208    fn compute_maximum_update_length_dual_node(
1209        &mut self,
1210        dual_node_ptr: &DualNodePtr,
1211        is_grow: bool,
1212        simultaneous_update: bool,
1213    ) -> MaxUpdateLength {
1214        // TODO: execute on all nodes that handles this dual node
1215        let max_update_length =
1216            self.serial_module
1217                .compute_maximum_update_length_dual_node(dual_node_ptr, is_grow, simultaneous_update);
1218        if !(self.children.is_none() && self.is_active) {
1219            // for those base partitions without being fused, we don't need to update
1220            max_update_length.update(); // only necessary after involved in fusion
1221        }
1222        max_update_length
1223    }
1224
1225    fn compute_maximum_update_length(&mut self) -> GroupMaxUpdateLength {
1226        // first prepare all dual node for growth and shrink accordingly and synchronize them
1227        self.prepare_all();
1228        // them do the functions independently
1229        let mut group_max_update_length = GroupMaxUpdateLength::new();
1230        self.iterative_compute_maximum_update_length(&mut group_max_update_length);
1231        if !(self.children.is_none() && self.is_active) {
1232            // for those base partitions without being fused, we don't need to update
1233            group_max_update_length.update(); // only necessary after involved in fusion
1234        }
1235        group_max_update_length
1236    }
1237
1238    fn grow_dual_node(&mut self, dual_node_ptr: &DualNodePtr, length: Weight) {
1239        let representative_vertex = dual_node_ptr.get_representative_vertex();
1240        debug_assert!(
1241            self.whole_range.contains(representative_vertex),
1242            "cannot grow dual node outside of the scope"
1243        );
1244        self.iterative_grow_dual_node(dual_node_ptr, length, representative_vertex);
1245    }
1246
1247    fn grow(&mut self, length: Weight) {
1248        self.iterative_grow(length);
1249    }
1250
1251    fn load_edge_modifier(&mut self, edge_modifier: &[(EdgeIndex, Weight)]) {
1252        // TODO: split the edge modifier and then load them to individual descendant units
1253        // hint: each edge could appear in any unit that mirrors the two vertices
1254        self.serial_module.load_edge_modifier(edge_modifier)
1255    }
1256
1257    fn prepare_nodes_shrink(&mut self, nodes_circle: &[DualNodePtr]) -> &mut Vec<SyncRequest> {
1258        let nodes_circle_vertices: Vec<_> = nodes_circle.iter().map(|ptr| ptr.get_representative_vertex()).collect();
1259        let mut sync_requests = vec![];
1260        loop {
1261            self.iterative_prepare_nodes_shrink(nodes_circle, &nodes_circle_vertices, &mut sync_requests);
1262            if sync_requests.is_empty() {
1263                break;
1264            }
1265            // println!("sync_requests: {sync_requests:?}");
1266            self.execute_sync_events(&sync_requests);
1267            sync_requests.clear();
1268        }
1269        &mut self.empty_sync_request
1270    }
1271
1272    fn prepare_all(&mut self) -> &mut Vec<SyncRequest> {
1273        if self.children.is_none() {
1274            // don't do anything, not even prepare the growth because it will be done in the serial module
1275        } else {
1276            let mut sync_requests = vec![];
1277            loop {
1278                self.iterative_prepare_all(&mut sync_requests);
1279                if sync_requests.is_empty() {
1280                    break;
1281                }
1282                // println!("sync_requests: {sync_requests:?}");
1283                self.execute_sync_events(&sync_requests);
1284                sync_requests.clear();
1285            }
1286        }
1287        &mut self.empty_sync_request
1288    }
1289
1290    fn execute_sync_event(&mut self, sync_event: &SyncRequest) {
1291        // if the vertex is not hold by any descendant, simply return
1292        if !self.is_vertex_in_descendant(sync_event.vertex_index) {
1293            return;
1294        }
1295        self.has_active_node = true;
1296        // println!("sync_prepare_growth_update_sync_event: vertex {}, unit index {}", sync_event.vertex_index, self.unit_index);
1297        // depth-first search
1298        if let Some((left_child_weak, right_child_weak)) = self.children.as_ref() {
1299            left_child_weak.upgrade_force().write().execute_sync_event(sync_event);
1300            right_child_weak.upgrade_force().write().execute_sync_event(sync_event);
1301        }
1302        // update on my serial module
1303        if self.serial_module.contains_vertex(sync_event.vertex_index) {
1304            // println!("update: vertex {}, unit index {}", sync_event.vertex_index, self.unit_index);
1305            self.serial_module.execute_sync_event(sync_event);
1306        }
1307        // if I'm not on the representative path of this dual node, I need to register the propagated_dual_node
1308        // note that I don't need to register propagated_grandson_dual_node because it's never gonna grow inside the blossom
1309        if let Some((propagated_dual_node_weak, _, representative_vertex)) = sync_event.propagated_dual_node.as_ref() {
1310            if !self.whole_range.contains(*representative_vertex) {
1311                self.elevated_dual_nodes.insert(propagated_dual_node_weak.upgrade_force());
1312            }
1313        }
1314        if let Some((propagated_dual_node_weak, _, representative_vertex)) =
1315            sync_event.propagated_grandson_dual_node.as_ref()
1316        {
1317            if !self.whole_range.contains(*representative_vertex) {
1318                self.elevated_dual_nodes.insert(propagated_dual_node_weak.upgrade_force());
1319            }
1320        }
1321    }
1322}
1323
1324/// interface consists of several vertices; each vertex exists as a virtual vertex in several different serial dual modules.
1325/// each virtual vertex exists in at most one interface
1326pub struct InterfaceData {
1327    /// the serial dual modules that processes these virtual vertices,
1328    pub possession_modules: Vec<DualModuleSerialWeak>,
1329    /// the virtual vertices references in different modules, [idx of serial dual module] [idx of interfacing vertex]
1330    pub interfacing_vertices: Vec<Vec<VertexWeak>>,
1331}
1332
1333/// interface between dual modules, consisting of a list of nodes of virtual nodes that sits on different modules
1334pub struct Interface {
1335    /// unique interface id for ease of zero-cost switching
1336    pub interface_id: usize,
1337    /// link to interface data
1338    pub data: Weak<InterfaceData>,
1339}
1340
1341#[cfg(test)]
1342pub mod tests {
1343    use super::super::example_codes::*;
1344    use super::super::primal_module::*;
1345    use super::super::primal_module_serial::*;
1346    use super::*;
1347
1348    pub fn dual_module_parallel_basic_standard_syndrome_optional_viz<F>(
1349        mut code: impl ExampleCode,
1350        visualize_filename: Option<String>,
1351        mut defect_vertices: Vec<VertexIndex>,
1352        final_dual: Weight,
1353        partition_func: F,
1354        reordered_vertices: Option<Vec<VertexIndex>>,
1355    ) -> (
1356        DualModuleInterfacePtr,
1357        PrimalModuleSerialPtr,
1358        DualModuleParallel<DualModuleSerial>,
1359    )
1360    where
1361        F: Fn(&SolverInitializer, &mut PartitionConfig),
1362    {
1363        println!("{defect_vertices:?}");
1364        if let Some(reordered_vertices) = &reordered_vertices {
1365            code.reorder_vertices(reordered_vertices);
1366            defect_vertices = translated_defect_to_reordered(reordered_vertices, &defect_vertices);
1367        }
1368        let mut visualizer = match visualize_filename.as_ref() {
1369            Some(visualize_filename) => {
1370                let visualizer = Visualizer::new(
1371                    Some(visualize_data_folder() + visualize_filename.as_str()),
1372                    code.get_positions(),
1373                    true,
1374                )
1375                .unwrap();
1376                print_visualize_link(visualize_filename.clone());
1377                Some(visualizer)
1378            }
1379            None => None,
1380        };
1381        let initializer = code.get_initializer();
1382        let mut partition_config = PartitionConfig::new(initializer.vertex_num);
1383        partition_func(&initializer, &mut partition_config);
1384        println!("partition_config: {partition_config:?}");
1385        let partition_info = partition_config.info();
1386        // create dual module
1387        let mut dual_module =
1388            DualModuleParallel::new_config(&initializer, &partition_info, DualModuleParallelConfig::default());
1389        dual_module.static_fuse_all();
1390        // create primal module
1391        let mut primal_module = PrimalModuleSerialPtr::new_empty(&initializer);
1392        primal_module.write().debug_resolve_only_one = true; // to enable debug mode
1393                                                             // try to work on a simple syndrome
1394        code.set_defect_vertices(&defect_vertices);
1395        let interface_ptr = DualModuleInterfacePtr::new_empty();
1396        primal_module.solve_visualizer(&interface_ptr, &code.get_syndrome(), &mut dual_module, visualizer.as_mut());
1397        let perfect_matching = primal_module.perfect_matching(&interface_ptr, &mut dual_module);
1398        let mut subgraph_builder = SubGraphBuilder::new(&initializer);
1399        subgraph_builder.load_perfect_matching(&perfect_matching);
1400        let subgraph = subgraph_builder.get_subgraph();
1401        if let Some(visualizer) = visualizer.as_mut() {
1402            visualizer
1403                .snapshot_combined(
1404                    "perfect matching and subgraph".to_string(),
1405                    vec![
1406                        &interface_ptr,
1407                        &dual_module,
1408                        &perfect_matching,
1409                        &VisualizeSubgraph::new(&subgraph),
1410                    ],
1411                )
1412                .unwrap();
1413        }
1414        assert_eq!(
1415            interface_ptr.sum_dual_variables(),
1416            subgraph_builder.total_weight(),
1417            "unmatched sum dual variables"
1418        );
1419        assert_eq!(
1420            interface_ptr.sum_dual_variables(),
1421            final_dual * 2,
1422            "unexpected final dual variable sum"
1423        );
1424        (interface_ptr, primal_module, dual_module)
1425    }
1426
1427    pub fn dual_module_parallel_standard_syndrome<F>(
1428        code: impl ExampleCode,
1429        visualize_filename: String,
1430        defect_vertices: Vec<VertexIndex>,
1431        final_dual: Weight,
1432        partition_func: F,
1433        reordered_vertices: Option<Vec<VertexIndex>>,
1434    ) -> (
1435        DualModuleInterfacePtr,
1436        PrimalModuleSerialPtr,
1437        DualModuleParallel<DualModuleSerial>,
1438    )
1439    where
1440        F: Fn(&SolverInitializer, &mut PartitionConfig),
1441    {
1442        dual_module_parallel_basic_standard_syndrome_optional_viz(
1443            code,
1444            Some(visualize_filename),
1445            defect_vertices,
1446            final_dual,
1447            partition_func,
1448            reordered_vertices,
1449        )
1450    }
1451
1452    /// test a simple case
1453    #[test]
1454    fn dual_module_parallel_basic_1() {
1455        // cargo test dual_module_parallel_basic_1 -- --nocapture
1456        let visualize_filename = "dual_module_parallel_basic_1.json".to_string();
1457        let defect_vertices = vec![39, 52, 63, 90, 100];
1458        let half_weight = 500;
1459        dual_module_parallel_standard_syndrome(
1460            CodeCapacityPlanarCode::new(11, 0.1, half_weight),
1461            visualize_filename,
1462            defect_vertices,
1463            9 * half_weight,
1464            |initializer, _config| {
1465                println!("initializer: {initializer:?}");
1466            },
1467            None,
1468        );
1469    }
1470
1471    /// split into 2, with no syndrome vertex on the interface
1472    #[test]
1473    fn dual_module_parallel_basic_2() {
1474        // cargo test dual_module_parallel_basic_2 -- --nocapture
1475        let visualize_filename = "dual_module_parallel_basic_2.json".to_string();
1476        let defect_vertices = vec![39, 52, 63, 90, 100];
1477        let half_weight = 500;
1478        dual_module_parallel_standard_syndrome(
1479            CodeCapacityPlanarCode::new(11, 0.1, half_weight),
1480            visualize_filename,
1481            defect_vertices,
1482            9 * half_weight,
1483            |_initializer, config| {
1484                config.partitions = vec![
1485                    VertexRange::new(0, 72),   // unit 0
1486                    VertexRange::new(84, 132), // unit 1
1487                ];
1488                config.fusions = vec![
1489                    (0, 1), // unit 2, by fusing 0 and 1
1490                ];
1491            },
1492            None,
1493        );
1494    }
1495
1496    /// split into 2, with a syndrome vertex on the interface
1497    #[test]
1498    fn dual_module_parallel_basic_3() {
1499        // cargo test dual_module_parallel_basic_3 -- --nocapture
1500        let visualize_filename = "dual_module_parallel_basic_3.json".to_string();
1501        let defect_vertices = vec![39, 52, 63, 90, 100];
1502        let half_weight = 500;
1503        dual_module_parallel_standard_syndrome(
1504            CodeCapacityPlanarCode::new(11, 0.1, half_weight),
1505            visualize_filename,
1506            defect_vertices,
1507            9 * half_weight,
1508            |_initializer, config| {
1509                config.partitions = vec![
1510                    VertexRange::new(0, 60),   // unit 0
1511                    VertexRange::new(72, 132), // unit 1
1512                ];
1513                config.fusions = vec![
1514                    (0, 1), // unit 2, by fusing 0 and 1
1515                ];
1516            },
1517            None,
1518        );
1519    }
1520
1521    /// split into 4, with no syndrome vertex on the interface
1522    #[test]
1523    fn dual_module_parallel_basic_4() {
1524        // cargo test dual_module_parallel_basic_4 -- --nocapture
1525        let visualize_filename = "dual_module_parallel_basic_4.json".to_string();
1526        // reorder vertices to enable the partition;
1527        let defect_vertices = vec![39, 52, 63, 90, 100]; // indices are before the reorder
1528        let half_weight = 500;
1529        dual_module_parallel_standard_syndrome(
1530            CodeCapacityPlanarCode::new(11, 0.1, half_weight),
1531            visualize_filename,
1532            defect_vertices,
1533            9 * half_weight,
1534            |_initializer, config| {
1535                config.partitions = vec![
1536                    VertexRange::new(0, 36),
1537                    VertexRange::new(42, 72),
1538                    VertexRange::new(84, 108),
1539                    VertexRange::new(112, 132),
1540                ];
1541                config.fusions = vec![(0, 1), (2, 3), (4, 5)];
1542            },
1543            Some({
1544                let mut reordered_vertices = vec![];
1545                let split_horizontal = 6;
1546                let split_vertical = 5;
1547                for i in 0..split_horizontal {
1548                    // left-top block
1549                    for j in 0..split_vertical {
1550                        reordered_vertices.push(i * 12 + j);
1551                    }
1552                    reordered_vertices.push(i * 12 + 11);
1553                }
1554                for i in 0..split_horizontal {
1555                    // interface between the left-top block and the right-top block
1556                    reordered_vertices.push(i * 12 + split_vertical);
1557                }
1558                for i in 0..split_horizontal {
1559                    // right-top block
1560                    for j in (split_vertical + 1)..10 {
1561                        reordered_vertices.push(i * 12 + j);
1562                    }
1563                    reordered_vertices.push(i * 12 + 10);
1564                }
1565                {
1566                    // the big interface between top and bottom
1567                    for j in 0..12 {
1568                        reordered_vertices.push(split_horizontal * 12 + j);
1569                    }
1570                }
1571                for i in (split_horizontal + 1)..11 {
1572                    // left-bottom block
1573                    for j in 0..split_vertical {
1574                        reordered_vertices.push(i * 12 + j);
1575                    }
1576                    reordered_vertices.push(i * 12 + 11);
1577                }
1578                for i in (split_horizontal + 1)..11 {
1579                    // interface between the left-bottom block and the right-bottom block
1580                    reordered_vertices.push(i * 12 + split_vertical);
1581                }
1582                for i in (split_horizontal + 1)..11 {
1583                    // right-bottom block
1584                    for j in (split_vertical + 1)..10 {
1585                        reordered_vertices.push(i * 12 + j);
1586                    }
1587                    reordered_vertices.push(i * 12 + 10);
1588                }
1589                reordered_vertices
1590            }),
1591        );
1592    }
1593
1594    /// split into 4, with 2 defect vertices on parent interfaces
1595    #[test]
1596    fn dual_module_parallel_basic_5() {
1597        // cargo test dual_module_parallel_basic_5 -- --nocapture
1598        let visualize_filename = "dual_module_parallel_basic_5.json".to_string();
1599        // reorder vertices to enable the partition;
1600        let defect_vertices = vec![39, 52, 63, 90, 100]; // indices are before the reorder
1601        let half_weight = 500;
1602        dual_module_parallel_standard_syndrome(
1603            CodeCapacityPlanarCode::new(11, 0.1, half_weight),
1604            visualize_filename,
1605            defect_vertices,
1606            9 * half_weight,
1607            |_initializer, config| {
1608                config.partitions = vec![
1609                    VertexRange::new(0, 25),
1610                    VertexRange::new(30, 60),
1611                    VertexRange::new(72, 97),
1612                    VertexRange::new(102, 132),
1613                ];
1614                config.fusions = vec![(0, 1), (2, 3), (4, 5)];
1615            },
1616            Some({
1617                let mut reordered_vertices = vec![];
1618                let split_horizontal = 5;
1619                let split_vertical = 4;
1620                for i in 0..split_horizontal {
1621                    // left-top block
1622                    for j in 0..split_vertical {
1623                        reordered_vertices.push(i * 12 + j);
1624                    }
1625                    reordered_vertices.push(i * 12 + 11);
1626                }
1627                for i in 0..split_horizontal {
1628                    // interface between the left-top block and the right-top block
1629                    reordered_vertices.push(i * 12 + split_vertical);
1630                }
1631                for i in 0..split_horizontal {
1632                    // right-top block
1633                    for j in (split_vertical + 1)..10 {
1634                        reordered_vertices.push(i * 12 + j);
1635                    }
1636                    reordered_vertices.push(i * 12 + 10);
1637                }
1638                {
1639                    // the big interface between top and bottom
1640                    for j in 0..12 {
1641                        reordered_vertices.push(split_horizontal * 12 + j);
1642                    }
1643                }
1644                for i in (split_horizontal + 1)..11 {
1645                    // left-bottom block
1646                    for j in 0..split_vertical {
1647                        reordered_vertices.push(i * 12 + j);
1648                    }
1649                    reordered_vertices.push(i * 12 + 11);
1650                }
1651                for i in (split_horizontal + 1)..11 {
1652                    // interface between the left-bottom block and the right-bottom block
1653                    reordered_vertices.push(i * 12 + split_vertical);
1654                }
1655                for i in (split_horizontal + 1)..11 {
1656                    // right-bottom block
1657                    for j in (split_vertical + 1)..10 {
1658                        reordered_vertices.push(i * 12 + j);
1659                    }
1660                    reordered_vertices.push(i * 12 + 10);
1661                }
1662                reordered_vertices
1663            }),
1664        );
1665    }
1666
1667    fn dual_module_parallel_debug_repetition_code_common(
1668        d: VertexNum,
1669        visualize_filename: String,
1670        defect_vertices: Vec<VertexIndex>,
1671        final_dual: Weight,
1672    ) {
1673        let half_weight = 500;
1674        let split_vertical = (d + 1) / 2;
1675        dual_module_parallel_standard_syndrome(
1676            CodeCapacityRepetitionCode::new(d, 0.1, half_weight),
1677            visualize_filename,
1678            defect_vertices,
1679            final_dual * half_weight,
1680            |initializer, config| {
1681                config.partitions = vec![
1682                    VertexRange::new(0, split_vertical + 1),
1683                    VertexRange::new(split_vertical + 2, initializer.vertex_num),
1684                ];
1685                config.fusions = vec![(0, 1)];
1686            },
1687            Some({
1688                let mut reordered_vertices = vec![];
1689                for j in 0..split_vertical {
1690                    reordered_vertices.push(j);
1691                }
1692                reordered_vertices.push(d);
1693                for j in split_vertical..d {
1694                    reordered_vertices.push(j);
1695                }
1696                reordered_vertices
1697            }),
1698        );
1699    }
1700
1701    /// debug blossom not growing properly
1702    #[test]
1703    fn dual_module_parallel_debug_1() {
1704        // cargo test dual_module_parallel_debug_1 -- --nocapture
1705        let visualize_filename = "dual_module_parallel_debug_1.json".to_string();
1706        let defect_vertices = vec![2, 3, 4, 5, 6, 7, 8]; // indices are before the reorder
1707        dual_module_parallel_debug_repetition_code_common(11, visualize_filename, defect_vertices, 5);
1708    }
1709
1710    /// debug 'internal error: entered unreachable code: VertexShrinkStop conflict cannot be solved by primal module
1711    /// the reason of this bug is that a shrinking node on the interface is sandwiched by two growing nodes resides on different children units
1712    /// for the serial implementation, this event can be easily handled by doing special configs
1713    /// but for the fused units, how to do it?
1714    /// This is the benefit of using software to develop first; if directly working on the hardware implementation, one would have to add more interface
1715    /// to support it, which could be super time-consuming
1716    #[test]
1717    fn dual_module_parallel_debug_2() {
1718        // cargo test dual_module_parallel_debug_2 -- --nocapture
1719        let visualize_filename = "dual_module_parallel_debug_2.json".to_string();
1720        let defect_vertices = vec![5, 6, 7]; // indices are before the reorder
1721        dual_module_parallel_debug_repetition_code_common(11, visualize_filename, defect_vertices, 4);
1722    }
1723
1724    /// the reason for this bug is that I forgot to set dual_variable correctly, leading to false VertexShrinkStop event at the
1725    #[test]
1726    fn dual_module_parallel_debug_3() {
1727        // cargo test dual_module_parallel_debug_3 -- --nocapture
1728        let visualize_filename = "dual_module_parallel_debug_3.json".to_string();
1729        let defect_vertices = vec![3, 5, 7]; // indices are before the reorder
1730        dual_module_parallel_debug_repetition_code_common(11, visualize_filename, defect_vertices, 5);
1731    }
1732
1733    /// incorrect final result
1734    /// the reason is I didn't search through all the representative vertices of all children nodes, causing the parent blossom not propagating correctly
1735    #[test]
1736    fn dual_module_parallel_debug_4() {
1737        // cargo test dual_module_parallel_debug_4 -- --nocapture
1738        let visualize_filename = "dual_module_parallel_debug_4.json".to_string();
1739        let defect_vertices = vec![2, 3, 5, 6, 7]; // indices are before the reorder
1740        dual_module_parallel_debug_repetition_code_common(11, visualize_filename, defect_vertices, 5);
1741    }
1742
1743    /// unwrap fail on dual node to internal dual node
1744    /// the reason is I forgot to implement the remove_blossom API...
1745    #[test]
1746    fn dual_module_parallel_debug_5() {
1747        // cargo test dual_module_parallel_debug_5 -- --nocapture
1748        let visualize_filename = "dual_module_parallel_debug_5.json".to_string();
1749        let defect_vertices = vec![0, 4, 7, 8, 9, 11]; // indices are before the reorder
1750        dual_module_parallel_debug_repetition_code_common(15, visualize_filename, defect_vertices, 7);
1751    }
1752
1753    fn dual_module_parallel_debug_planar_code_common(
1754        d: VertexNum,
1755        visualize_filename: String,
1756        defect_vertices: Vec<VertexIndex>,
1757        final_dual: Weight,
1758    ) {
1759        let half_weight = 500;
1760        let split_horizontal = (d + 1) / 2;
1761        let row_count = d + 1;
1762        dual_module_parallel_standard_syndrome(
1763            CodeCapacityPlanarCode::new(d, 0.1, half_weight),
1764            visualize_filename,
1765            defect_vertices,
1766            final_dual * half_weight,
1767            |initializer, config| {
1768                config.partitions = vec![
1769                    VertexRange::new(0, split_horizontal * row_count),
1770                    VertexRange::new((split_horizontal + 1) * row_count, initializer.vertex_num),
1771                ];
1772                config.fusions = vec![(0, 1)];
1773            },
1774            None,
1775        );
1776    }
1777
1778    /// panic 'one cannot conflict with itself, double check to avoid deadlock'
1779    /// reason: when merging two `VertexShrinkStop` events into a single `Conflicting` event, I forget to check whether the two pointers are the same;
1780    /// if so, I should simply ignore it
1781    #[test]
1782    fn dual_module_parallel_debug_6() {
1783        // cargo test dual_module_parallel_debug_6 -- --nocapture
1784        let visualize_filename = "dual_module_parallel_debug_6.json".to_string();
1785        let defect_vertices = vec![10, 11, 13, 32, 36, 37, 40, 44]; // indices are before the reorder
1786        dual_module_parallel_debug_planar_code_common(7, visualize_filename, defect_vertices, 5);
1787    }
1788
1789    /// panic 'one cannot conflict with itself, double check to avoid deadlock'
1790    /// reason: when comparing the pointers of two `VertexShrinkStop` events, only compare their conflicting dual node, not the touching dual node
1791    #[test]
1792    fn dual_module_parallel_debug_7() {
1793        // cargo test dual_module_parallel_debug_7 -- --nocapture
1794        let visualize_filename = "dual_module_parallel_debug_7.json".to_string();
1795        let defect_vertices = vec![3, 12, 21, 24, 27, 28, 33, 35, 36, 43, 50, 51]; // indices are before the reorder
1796        dual_module_parallel_debug_planar_code_common(7, visualize_filename, defect_vertices, 10);
1797    }
1798
1799    /// panic `Option::unwrap()` on a `None` value', src/dual_module.rs:242:1
1800    #[test]
1801    fn dual_module_parallel_debug_8() {
1802        // cargo test dual_module_parallel_debug_8 -- --nocapture
1803        let visualize_filename = "dual_module_parallel_debug_8.json".to_string();
1804        let defect_vertices = vec![1, 2, 3, 4, 9, 10, 13, 16, 17, 19, 24, 29, 33, 36, 37, 44, 48, 49, 51, 52]; // indices are before the reorder
1805        dual_module_parallel_debug_planar_code_common(7, visualize_filename, defect_vertices, 13);
1806    }
1807
1808    /// panicked at 'dual node of edge should be some', src/dual_module_serial.rs:379:13
1809    /// reason: blossom's boundary has duplicate edges, solved by adding dedup functionality to edges
1810    #[test]
1811    fn dual_module_parallel_debug_9() {
1812        // cargo test dual_module_parallel_debug_9 -- --nocapture
1813        let visualize_filename = "dual_module_parallel_debug_9.json".to_string();
1814        let defect_vertices = vec![60, 61, 72, 74, 84, 85, 109]; // indices are before the reorder
1815        dual_module_parallel_debug_planar_code_common(11, visualize_filename, defect_vertices, 6);
1816    }
1817
1818    /// infinite loop at group_max_update_length: Conflicts(([Conflicting((12, 4), (15, 5))], {}))
1819    /// reason: I falsely use representative_vertex of the blossom instead of the representative vertices in the nodes circle in sync_prepare_blossom_initial_shrink
1820    #[test]
1821    fn dual_module_parallel_debug_10() {
1822        // cargo test dual_module_parallel_debug_10 -- --nocapture
1823        let visualize_filename = "dual_module_parallel_debug_10.json".to_string();
1824        let defect_vertices = vec![145, 146, 165, 166, 183, 185, 203, 204, 205, 225, 264]; // indices are before the reorder
1825        dual_module_parallel_debug_planar_code_common(19, visualize_filename, defect_vertices, 11);
1826    }
1827
1828    /// panicked at 'dual node of edge should be none', src/dual_module_serial.rs:400:25
1829    /// reason: duplicate edge in the boundary... again...
1830    /// this time it's because when judging whether an edge is already in the boundary, I mistakenly put the clearing edge logic into
1831    /// the if condition as well... when the edge is duplicate in the boundary already, my code will not clear the edge properly
1832    #[test]
1833    fn dual_module_parallel_debug_11() {
1834        // cargo test dual_module_parallel_debug_11 -- --nocapture
1835        let visualize_filename = "dual_module_parallel_debug_11.json".to_string();
1836        let defect_vertices = vec![192, 193, 194, 212, 214, 232, 233]; // indices are before the reorder
1837        dual_module_parallel_debug_planar_code_common(19, visualize_filename, defect_vertices, 7);
1838    }
1839
1840    /// panicked at 'no sync requests should arise here; make sure to deal with all sync requests before growing', src/dual_module_serial.rs:582:13
1841    /// just loop the synchronization process until no sync requests emerge
1842    #[test]
1843    fn dual_module_parallel_debug_12() {
1844        // cargo test dual_module_parallel_debug_12 -- --nocapture
1845        let visualize_filename = "dual_module_parallel_debug_12.json".to_string();
1846        let defect_vertices = vec![197, 216, 235, 275, 296, 316]; // indices are before the reorder
1847        dual_module_parallel_debug_planar_code_common(19, visualize_filename, defect_vertices, 5);
1848    }
1849
1850    /// test rayon global thread pool
1851    #[test]
1852    fn dual_module_parallel_rayon_test_1() {
1853        // cargo test dual_module_parallel_rayon_test_1 -- --nocapture
1854        rayon::scope(|_| {
1855            println!("A");
1856            rayon::scope(|s| {
1857                s.spawn(|_| println!("B"));
1858                s.spawn(|_| println!("C"));
1859                s.spawn(|_| println!("D"));
1860                s.spawn(|_| println!("E"));
1861            });
1862            println!("F");
1863            rayon::scope(|s| {
1864                s.spawn(|_| println!("G"));
1865                s.spawn(|_| println!("H"));
1866                s.spawn(|_| println!("J"));
1867            });
1868            println!("K");
1869        });
1870    }
1871
1872    #[test]
1873    fn dual_module_parallel_rayon_test_2() {
1874        // cargo test dual_module_parallel_rayon_test_2 -- --nocapture
1875        let mut results = vec![];
1876        rayon::scope(|_| {
1877            results.push("A");
1878            let (mut ret_b, mut ret_c, mut ret_d, mut ret_e) = (None, None, None, None);
1879            rayon::scope(|s| {
1880                s.spawn(|_| ret_b = Some("B"));
1881                s.spawn(|_| ret_c = Some("C"));
1882                s.spawn(|_| ret_d = Some("D"));
1883                s.spawn(|_| ret_e = Some("E"));
1884            });
1885            results.push(ret_b.unwrap());
1886            results.push(ret_c.unwrap());
1887            results.push(ret_d.unwrap());
1888            results.push(ret_e.unwrap());
1889            results.push("F");
1890            let (mut ret_g, mut ret_h, mut ret_j) = (None, None, None);
1891            rayon::scope(|s| {
1892                s.spawn(|_| ret_g = Some("G"));
1893                s.spawn(|_| ret_h = Some("H"));
1894                s.spawn(|_| ret_j = Some("J"));
1895            });
1896            results.push(ret_g.unwrap());
1897            results.push(ret_h.unwrap());
1898            results.push(ret_j.unwrap());
1899            results.push("K");
1900        });
1901        println!("results: {results:?}");
1902    }
1903
1904    #[test]
1905    fn dual_module_parallel_rayon_test_3() {
1906        // cargo test dual_module_parallel_rayon_test_3 -- --nocapture
1907        let mut results = vec![];
1908        rayon::scope(|_| {
1909            results.push("A");
1910            results.par_extend(["B", "C", "D", "E"].into_par_iter().map(|id| {
1911                // some complex calculation
1912                id
1913            }));
1914            results.push("F");
1915            results.par_extend(["G", "H", "J"].into_par_iter().map(|id| {
1916                // some complex calculation
1917                id
1918            }));
1919            results.push("K");
1920        });
1921        println!("results: {results:?}");
1922    }
1923}