1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
//! Parallel Primal Module
//! 
//! A parallel implementation of the primal module, by calling functions provided by the serial primal module
//!

use super::util::*;
use serde::{Serialize, Deserialize};
use crate::rayon::prelude::*;
use super::primal_module::*;
use super::primal_module_serial::*;
use super::dual_module_parallel::*;
use super::visualize::*;
use super::dual_module::*;
use std::sync::{Arc, Mutex, Condvar};
use std::ops::DerefMut;
use std::time::{Instant, Duration};
use super::pointers::*;


pub struct PrimalModuleParallel {
    /// the basic wrapped serial modules at the beginning, afterwards the fused units are appended after them
    pub units: Vec<PrimalModuleParallelUnitPtr>,
    /// local configuration
    pub config: PrimalModuleParallelConfig,
    /// partition information generated by the config
    pub partition_info: Arc<PartitionInfo>,
    /// thread pool used to execute async functions in parallel
    pub thread_pool: Arc<rayon::ThreadPool>,
    /// the time of calling [`PrimalModuleParallel::parallel_solve_step_callback`] method
    pub last_solve_start_time: ArcRwLock<Instant>,
}

pub struct PrimalModuleParallelUnit {
    /// the index
    pub unit_index: usize,
    /// the dual module interface, for constant-time clear
    pub interface_ptr: DualModuleInterfacePtr,
    /// partition information generated by the config
    pub partition_info: Arc<PartitionInfo>,
    /// whether it's active or not; some units are "placeholder" units that are not active until they actually fuse their children
    pub is_active: bool,
    /// the owned serial primal module
    pub serial_module: PrimalModuleSerialPtr,
    /// left and right children dual modules
    pub children: Option<(PrimalModuleParallelUnitWeak, PrimalModuleParallelUnitWeak)>,
    /// parent dual module
    pub parent: Option<PrimalModuleParallelUnitWeak>,
    /// record the time of events
    pub event_time: Option<PrimalModuleParallelUnitEventTime>,
    /// streaming decode mocker, if exists, base partition will wait until specified time and then start decoding
    pub streaming_decode_mocker: Option<StreamingDecodeMocker>,
}

pub type PrimalModuleParallelUnitPtr = ArcManualSafeLock<PrimalModuleParallelUnit>;
pub type PrimalModuleParallelUnitWeak = WeakManualSafeLock<PrimalModuleParallelUnit>;

impl std::fmt::Debug for PrimalModuleParallelUnitPtr {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        let unit = self.read_recursive();
        write!(f, "{}", unit.unit_index)
    }
}

impl std::fmt::Debug for PrimalModuleParallelUnitWeak {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        self.upgrade_force().fmt(f)
    }
}

/// the time of critical events, for profiling purposes
#[derive(Debug, Clone, Serialize)]
pub struct PrimalModuleParallelUnitEventTime {
    /// unit starts executing
    pub start: f64,
    /// unit ends executing
    pub end: f64,
    /// thread index
    pub thread_index: usize,
}

impl Default for PrimalModuleParallelUnitEventTime {
    fn default() -> Self {
        Self::new()
    }
}

impl PrimalModuleParallelUnitEventTime {
    pub fn new() -> Self {
        Self {
            start: 0.,
            end: 0.,
            thread_index: rayon::current_thread_index().unwrap_or(0),
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PrimalModuleParallelConfig {
    /// enable async execution of dual operations; only used when calling top-level operations, not used in individual units
    #[serde(default = "primal_module_parallel_default_configs::thread_pool_size")]
    pub thread_pool_size: usize,
    /// debug by sequentially run the fusion tasks, user must enable this for visualizer to work properly during the execution
    #[serde(default = "primal_module_parallel_default_configs::debug_sequential")]
    pub debug_sequential: bool,
    /// schedule base partition tasks in the front
    #[serde(default = "primal_module_parallel_default_configs::prioritize_base_partition")]
    pub prioritize_base_partition: bool,
    #[serde(default = "primal_module_parallel_default_configs::interleaving_base_fusion")]
    pub interleaving_base_fusion: usize,
    /// pin threads to cores sequentially
    #[serde(default = "primal_module_parallel_default_configs::pin_threads_to_cores")]
    pub pin_threads_to_cores: bool,
    /// streaming decode mocker
    pub streaming_decode_mock_measure_interval: Option<f64>,
    /// streaming decoder using spin lock instead of threads.sleep to avoid context switch
    #[serde(default = "primal_module_parallel_default_configs::streaming_decode_use_spin_lock")]
    pub streaming_decode_use_spin_lock: bool,
}

impl Default for PrimalModuleParallelConfig {
    fn default() -> Self { serde_json::from_value(json!({})).unwrap() }
}

pub mod primal_module_parallel_default_configs {
    pub fn thread_pool_size() -> usize { 0 }  // by default to the number of CPU cores
    // pub fn thread_pool_size() -> usize { 1 }  // debug: use a single core
    pub fn debug_sequential() -> bool { false }  // by default enabled: only disable when you need to debug and get visualizer to work
    pub fn pin_threads_to_cores() -> bool { false }  // pin threads to cores to achieve most stable results
    pub fn prioritize_base_partition() -> bool { true }  // by default enable because this is faster by placing time-consuming tasks in the front
    pub fn interleaving_base_fusion() -> usize { usize::MAX }  // starts interleaving base and fusion after this unit_index
    pub fn streaming_decode_use_spin_lock() -> bool { false }  // by default use threads.sleep; enable only when benchmarking latency
}

pub struct StreamingDecodeMocker {
    /// indicating the syndrome ready time = `last_solve_start_time` + bias
    pub bias: Duration,
}

impl PrimalModuleParallel {

    /// recommended way to create a new instance, given a customized configuration
    pub fn new_config(initializer: &SolverInitializer, partition_info: &PartitionInfo, config: PrimalModuleParallelConfig) -> Self {
        let partition_info = Arc::new(partition_info.clone());
        let mut thread_pool_builder = rayon::ThreadPoolBuilder::new();
        if config.thread_pool_size != 0 {
            thread_pool_builder = thread_pool_builder.num_threads(config.thread_pool_size);
        }
        if config.pin_threads_to_cores {
            let core_ids = core_affinity::get_core_ids().unwrap();
            // println!("core_ids: {core_ids:?}");
            thread_pool_builder = thread_pool_builder.start_handler(move |thread_index| {
                // https://stackoverflow.com/questions/7274585/linux-find-out-hyper-threaded-core-id
                if thread_index < core_ids.len() {
                    crate::core_affinity::set_for_current(core_ids[thread_index]);
                }  // otherwise let OS decide which core to execute
            });
        }
        let thread_pool = thread_pool_builder.build().expect("creating thread pool failed");
        let mut units = vec![];
        let unit_count = partition_info.units.len();
        thread_pool.scope(|_| {
            (0..unit_count).into_par_iter().map(|unit_index| {
                // println!("unit_index: {unit_index}");
                let primal_module = PrimalModuleSerialPtr::new_empty(initializer);
                PrimalModuleParallelUnitPtr::new_wrapper(primal_module, unit_index, Arc::clone(&partition_info))
            }).collect_into_vec(&mut units);
        });
        // fill in the children and parent references
        for unit_index in 0..unit_count {
            let mut unit = units[unit_index].write();
            if let Some((left_children_index, right_children_index)) = &partition_info.units[unit_index].children {
                unit.children = Some((units[*left_children_index].downgrade(), units[*right_children_index].downgrade()))
            }
            if let Some(parent_index) = &partition_info.units[unit_index].parent {
                unit.parent = Some(units[*parent_index].downgrade());
            }
            if let Some(measure_interval) = config.streaming_decode_mock_measure_interval {
                if unit_index < partition_info.config.partitions.len() {  // only base partition is blocked by mock hardware syndrome measurement
                    unit.streaming_decode_mocker = Some(StreamingDecodeMocker {
                        bias: Duration::from_secs_f64(measure_interval * (unit_index + 1) as f64),
                    })
                }
            }
        }
        Self {
            units,
            config,
            partition_info,
            thread_pool: Arc::new(thread_pool),
            last_solve_start_time: ArcRwLock::new_value(Instant::now()),
        }
    }

}

impl PrimalModuleImpl for PrimalModuleParallel {

    fn new_empty(initializer: &SolverInitializer) -> Self {
        Self::new_config(initializer, &PartitionConfig::new(initializer.vertex_num).info(), PrimalModuleParallelConfig::default())
    }

    #[inline(never)]
    fn clear(&mut self) {
        self.thread_pool.scope(|_| {
            self.units.par_iter().enumerate().for_each(|(unit_idx, unit_ptr)| {
                let mut unit = unit_ptr.write();
                let partition_unit_info = &unit.partition_info.units[unit_idx];
                let is_active = partition_unit_info.children.is_none();
                unit.clear();
                unit.is_active = is_active;
            });
        });
    }

    fn load_defect_dual_node(&mut self, _dual_node_ptr: &DualNodePtr) {
        panic!("load interface directly into the parallel primal module is forbidden, use `parallel_solve` instead");
    }

    fn resolve<D: DualModuleImpl>(&mut self, _group_max_update_length: GroupMaxUpdateLength, _interface: &DualModuleInterfacePtr, _dual_module: &mut D) {
        panic!("parallel primal module cannot handle global resolve requests, use `parallel_solve` instead");
    }

    fn intermediate_matching<D: DualModuleImpl>(&mut self, interface: &DualModuleInterfacePtr, dual_module: &mut D) -> IntermediateMatching {
        let mut intermediate_matching = IntermediateMatching::new();
        for unit_ptr in self.units.iter() {
            lock_write!(unit, unit_ptr);
            if !unit.is_active { continue }  // do not visualize inactive units
            intermediate_matching.append(&mut unit.serial_module.intermediate_matching(interface, dual_module));
        }
        intermediate_matching
    }

    fn generate_profiler_report(&self) -> serde_json::Value {
        let event_time_vec: Vec<_> = self.units.iter().map(|ptr| ptr.read_recursive().event_time.clone()).collect();
        json!({
            "event_time_vec": event_time_vec,
        })
    }

}

impl PrimalModuleParallel {

    pub fn parallel_solve<DualSerialModule: DualModuleImpl + Send + Sync>
            (&mut self, syndrome_pattern: &SyndromePattern, parallel_dual_module: &mut DualModuleParallel<DualSerialModule>) {
        self.parallel_solve_step_callback(syndrome_pattern, parallel_dual_module, |_, _, _, _| {})
    }

    pub fn parallel_solve_visualizer<DualSerialModule: DualModuleImpl + Send + Sync + FusionVisualizer>
            (&mut self, syndrome_pattern: &SyndromePattern, parallel_dual_module: &mut DualModuleParallel<DualSerialModule>
            , visualizer: Option<&mut Visualizer>) {
        if let Some(visualizer) = visualizer {
            self.parallel_solve_step_callback(syndrome_pattern, parallel_dual_module
                , |interface_ptr, dual_module, primal_module, group_max_update_length| {
                    if let Some(group_max_update_length) = group_max_update_length {
                        if cfg!(debug_assertions) {
                            println!("group_max_update_length: {:?}", group_max_update_length);
                        }
                        if let Some(length) = group_max_update_length.get_none_zero_growth() {
                            visualizer.snapshot_combined(format!("grow {length}"), vec![interface_ptr, dual_module, primal_module]).unwrap();
                        } else {
                            let first_conflict = format!("{:?}", group_max_update_length.peek().unwrap());
                            visualizer.snapshot_combined(format!("resolve {first_conflict}"), vec![interface_ptr, dual_module, primal_module]).unwrap();
                        };
                    } else {
                        visualizer.snapshot_combined("unit solved".to_string(), vec![interface_ptr, dual_module, primal_module]).unwrap();
                    }
                });
            let last_unit = self.units.last().unwrap().read_recursive();
            visualizer.snapshot_combined("solved".to_string(), vec![&last_unit.interface_ptr, parallel_dual_module, self]).unwrap();
        } else {
            self.parallel_solve(syndrome_pattern, parallel_dual_module);
        }
    }

    pub fn parallel_solve_step_callback<DualSerialModule: DualModuleImpl + Send + Sync, F: Send + Sync>
            (&mut self, syndrome_pattern: &SyndromePattern, parallel_dual_module: &mut DualModuleParallel<DualSerialModule>, mut callback: F)
            where F: FnMut(&DualModuleInterfacePtr, &DualModuleParallelUnit<DualSerialModule>, &PrimalModuleSerialPtr, Option<&GroupMaxUpdateLength>) {
        let thread_pool = Arc::clone(&self.thread_pool);
        *self.last_solve_start_time.write() = Instant::now();
        if self.config.prioritize_base_partition {
            if self.config.debug_sequential {
                for unit_index in 0..self.partition_info.units.len() {
                    let unit_ptr = self.units[unit_index].clone();
                    unit_ptr.children_ready_solve::<DualSerialModule, F>(self, PartitionedSyndromePattern::new(syndrome_pattern)
                        , parallel_dual_module, &mut Some(&mut callback));
                }
            } else {
                use std::sync::atomic::{AtomicUsize, Ordering};
                let ready_vec: Vec<_> = {
                    (0..self.partition_info.units.len()).map(|_| Arc::new((Mutex::new(false), Condvar::new(), Arc::new(AtomicUsize::new(0))))).collect()
                };
                thread_pool.scope_fifo(|s| {
                    let issue_unit = |unit_index: usize| {
                        let ready_vec = &ready_vec;
                        let units = &self.units;
                        let partition_info = &self.partition_info;
                        let parallel_unit = &self;
                        let parallel_dual_module = &parallel_dual_module;
                        let streaming_decode_use_spin_lock = self.config.streaming_decode_use_spin_lock;
                        s.spawn_fifo(move |_| {
                            let ready_pair = ready_vec[unit_index].clone();
                            let &(ref ready, ref condvar, ref spin_ready) = &*ready_pair;
                            if streaming_decode_use_spin_lock {
                                let unit_ptr = units[unit_index].clone();
                                if unit_index >= partition_info.config.partitions.len() {  // wait for children to complete
                                    let fusion_index = unit_index - partition_info.config.partitions.len();
                                    let (left_unit_index, right_unit_index) = partition_info.config.fusions[fusion_index];
                                    for child_unit_index in [left_unit_index, right_unit_index] {
                                        let child_ready_pair = ready_vec[child_unit_index].clone();
                                        let &(_, _, ref child_spin_ready) = &*child_ready_pair;
                                        while child_spin_ready.load(Ordering::SeqCst) != 1 {  // hopefully this asserts false at the beginning
                                            std::hint::spin_loop();
                                            // println!("spin_loop");
                                        }
                                    }
                                }
                                unit_ptr.children_ready_solve::<DualSerialModule, F>(parallel_unit, PartitionedSyndromePattern::new(syndrome_pattern)
                                    , parallel_dual_module, &mut None);
                                spin_ready.store(1, Ordering::SeqCst);
                            } else {
                                let mut is_ready = ready.lock().unwrap();
                                let unit_ptr = units[unit_index].clone();
                                if unit_index >= partition_info.config.partitions.len() {  // wait for children to complete
                                    let fusion_index = unit_index - partition_info.config.partitions.len();
                                    let (left_unit_index, right_unit_index) = partition_info.config.fusions[fusion_index];
                                    for child_unit_index in [left_unit_index, right_unit_index] {
                                        let child_ready_pair = ready_vec[child_unit_index].clone();
                                        let &(ref child_ready, ref child_condvar, _) = &*child_ready_pair;
                                        let mut child_is_ready = child_ready.lock().unwrap();
                                        while !*child_is_ready {  // hopefully this asserts false at the beginning
                                            child_is_ready = child_condvar.wait(child_is_ready).unwrap();
                                        }
                                    }
                                }
                                unit_ptr.children_ready_solve::<DualSerialModule, F>(parallel_unit, PartitionedSyndromePattern::new(syndrome_pattern)
                                    , parallel_dual_module, &mut None);
                                *is_ready = true;
                                condvar.notify_one();
                            }
                        })
                    };
                    if self.config.interleaving_base_fusion >= self.partition_info.config.fusions.len() {
                        for unit_index in 0..self.partition_info.units.len() {
                            issue_unit(unit_index);
                        }
                    } else {
                        for unit_index in 0..self.partition_info.config.partitions.len() {
                            if unit_index >= self.config.interleaving_base_fusion {
                                let fusion_index = self.partition_info.config.partitions.len() + (unit_index - self.config.interleaving_base_fusion);
                                issue_unit(fusion_index);
                            }
                            issue_unit(unit_index);
                        }
                        for bias_index in 1..self.config.interleaving_base_fusion {
                            issue_unit(self.partition_info.units.len() - self.config.interleaving_base_fusion + bias_index);
                        }
                    }
                });
            }
        } else {
            let last_unit_ptr = self.units.last().unwrap().clone();
            thread_pool.scope(|_| {
                last_unit_ptr.iterative_solve_step_callback(self, PartitionedSyndromePattern::new(syndrome_pattern), parallel_dual_module, &mut Some(&mut callback))
            })
        }
    }

}

impl FusionVisualizer for PrimalModuleParallel {
    fn snapshot(&self, abbrev: bool) -> serde_json::Value {
        // do the sanity check first before taking snapshot
        // self.sanity_check().unwrap();
        let mut value = json!({});
        for unit_ptr in self.units.iter() {
            let unit = unit_ptr.read_recursive();
            if !unit.is_active { continue }  // do not visualize inactive units
            let value_2 = unit.snapshot(abbrev);
            snapshot_combine_values(&mut value, value_2, abbrev);
        }
        value
    }
}

impl FusionVisualizer for PrimalModuleParallelUnit {
    fn snapshot(&self, abbrev: bool) -> serde_json::Value {
        self.serial_module.snapshot(abbrev)
    }
}

impl PrimalModuleParallelUnitPtr {

    /// create a simple wrapper over a serial dual module
    pub fn new_wrapper(serial_module: PrimalModuleSerialPtr, unit_index: usize, partition_info: Arc<PartitionInfo>) -> Self {
        let partition_unit_info = &partition_info.units[unit_index];
        let is_active = partition_unit_info.children.is_none();
        let interface_ptr = DualModuleInterfacePtr::new_empty();
        interface_ptr.write().unit_index = unit_index;
        Self::new_value(PrimalModuleParallelUnit {
            unit_index,
            interface_ptr,
            partition_info,
            is_active,  // only activate the leaves in the dependency tree
            serial_module,
            children: None,  // to be filled later
            parent: None,  // to be filled later
            event_time: None,
            streaming_decode_mocker: None,
        })
    }

    /// call this only if children is guaranteed to be ready and solved
    fn children_ready_solve<DualSerialModule: DualModuleImpl + Send + Sync, F: Send + Sync>(&self, primal_module_parallel: &PrimalModuleParallel
                , partitioned_syndrome_pattern: PartitionedSyndromePattern, parallel_dual_module: &DualModuleParallel<DualSerialModule>, callback: &mut Option<&mut F>)
            where F: FnMut(&DualModuleInterfacePtr, &DualModuleParallelUnit<DualSerialModule>, &PrimalModuleSerialPtr, Option<&GroupMaxUpdateLength>) {
        let mut primal_unit = self.write();
        if let Some(mocker) = &primal_unit.streaming_decode_mocker {
            if primal_module_parallel.config.streaming_decode_use_spin_lock {
                while primal_module_parallel.last_solve_start_time.read_recursive().elapsed() < mocker.bias {
                    std::hint::spin_loop();  // spin to avoid context switch
                }
            } else {
                let mut elapsed = primal_module_parallel.last_solve_start_time.read_recursive().elapsed();
                while elapsed < mocker.bias {
                    std::thread::sleep(mocker.bias - elapsed);
                    elapsed = primal_module_parallel.last_solve_start_time.read_recursive().elapsed();
                }
            }
        }
        let mut event_time = PrimalModuleParallelUnitEventTime::new();
        event_time.start = primal_module_parallel.last_solve_start_time.read_recursive().elapsed().as_secs_f64();
        let dual_module_ptr = parallel_dual_module.get_unit(primal_unit.unit_index);
        let mut dual_unit = dual_module_ptr.write();
        let partition_unit_info = &primal_unit.partition_info.units[primal_unit.unit_index];
        let (owned_defect_range, _) = partitioned_syndrome_pattern.partition(partition_unit_info);
        let interface_ptr = primal_unit.interface_ptr.clone();
        if let Some((left_child_weak, right_child_weak)) = primal_unit.children.as_ref() {
            {  // set children to inactive to avoid being solved twice
                for child_weak in [left_child_weak, right_child_weak] {
                    let child_ptr = child_weak.upgrade_force();
                    let mut child = child_ptr.write();
                    debug_assert!(child.is_active, "cannot fuse inactive children");
                    child.is_active = false;
                }
            }
            primal_unit.fuse(&mut dual_unit);
            if let Some(callback) = callback.as_mut() {  // do callback before actually breaking the matched pairs, for ease of visualization
                callback(&primal_unit.interface_ptr, &dual_unit, &primal_unit.serial_module, None);
            }
            primal_unit.break_matching_with_mirror(dual_unit.deref_mut());
            for defect_index in owned_defect_range.whole_defect_range.iter() {
                let defect_vertex = partitioned_syndrome_pattern.syndrome_pattern.defect_vertices[defect_index as usize];
                primal_unit.serial_module.load_defect(defect_vertex, &interface_ptr, dual_unit.deref_mut());
            }
            primal_unit.serial_module.solve_step_callback_interface_loaded(&interface_ptr, dual_unit.deref_mut()
                , |interface, dual_module, primal_module, group_max_update_length| {
                    if let Some(callback) = callback.as_mut() {
                        callback(interface, dual_module, primal_module, Some(group_max_update_length));
                    }
                });
            if let Some(callback) = callback.as_mut() {
                callback(&primal_unit.interface_ptr, &dual_unit, &primal_unit.serial_module, None);
            }
        } else {
            debug_assert!(primal_unit.is_active, "leaf must be active to be solved");
            let syndrome_pattern = owned_defect_range.expand();
            primal_unit.serial_module.solve_step_callback(&interface_ptr, &syndrome_pattern, dual_unit.deref_mut()
                , |interface, dual_module, primal_module, group_max_update_length| {
                    if let Some(callback) = callback.as_mut() {
                        callback(interface, dual_module, primal_module, Some(group_max_update_length));
                    }
                });
            if let Some(callback) = callback.as_mut() {
                callback(&primal_unit.interface_ptr, &dual_unit, &primal_unit.serial_module, None);
            }
        }
        primal_unit.is_active = true;
        event_time.end = primal_module_parallel.last_solve_start_time.read_recursive().elapsed().as_secs_f64();
        primal_unit.event_time = Some(event_time);
    }

    /// call on the last primal node, and it will spawn tasks on the previous ones
    fn iterative_solve_step_callback<DualSerialModule: DualModuleImpl + Send + Sync, F: Send + Sync>(&self, primal_module_parallel: &PrimalModuleParallel
                , partitioned_syndrome_pattern: PartitionedSyndromePattern, parallel_dual_module: &DualModuleParallel<DualSerialModule>, callback: &mut Option<&mut F>)
            where F: FnMut(&DualModuleInterfacePtr, &DualModuleParallelUnit<DualSerialModule>, &PrimalModuleSerialPtr, Option<&GroupMaxUpdateLength>) {
        let primal_unit = self.read_recursive();
        // only when sequentially running the tasks will the callback take effect, otherwise it's unsafe to execute it from multiple threads
        let debug_sequential = primal_module_parallel.config.debug_sequential;
        if let Some((left_child_weak, right_child_weak)) = primal_unit.children.as_ref() {  // make children ready
            debug_assert!(!primal_unit.is_active, "parent must be inactive at the time of solving children");
            let partition_unit_info = &primal_unit.partition_info.units[primal_unit.unit_index];
            let (_, (left_partitioned, right_partitioned)) = partitioned_syndrome_pattern.partition(partition_unit_info);
            if debug_sequential {
                left_child_weak.upgrade_force().iterative_solve_step_callback(primal_module_parallel, left_partitioned
                    , parallel_dual_module, callback);
                right_child_weak.upgrade_force().iterative_solve_step_callback(primal_module_parallel, right_partitioned
                    , parallel_dual_module, callback);
            } else {
                rayon::join(|| {
                    left_child_weak.upgrade_force().iterative_solve_step_callback::<DualSerialModule, F>(primal_module_parallel, left_partitioned
                        , parallel_dual_module, &mut None)
                }, || {
                    right_child_weak.upgrade_force().iterative_solve_step_callback::<DualSerialModule, F>(primal_module_parallel, right_partitioned
                        , parallel_dual_module, &mut None)
                });
            };
        }
        drop(primal_unit);
        self.children_ready_solve(primal_module_parallel, partitioned_syndrome_pattern, parallel_dual_module, callback);
    }

}

impl PrimalModuleParallelUnit {

    /// fuse two units together, by copying the right child's content into the left child's content and resolve index;
    /// note that this operation doesn't update on the dual module, call [`Self::break_matching_with_mirror`] if needed
    pub fn fuse<DualSerialModule: DualModuleImpl + Send + Sync>(&mut self, dual_unit: &mut DualModuleParallelUnit<DualSerialModule>) {
        let (left_child_ptr, right_child_ptr) = (self.children.as_ref().unwrap().0.upgrade_force(), self.children.as_ref().unwrap().1.upgrade_force());
        let left_child = left_child_ptr.read_recursive();
        let right_child = right_child_ptr.read_recursive();
        dual_unit.fuse(&self.interface_ptr, (&left_child.interface_ptr, &right_child.interface_ptr));
        self.serial_module.fuse(&left_child.serial_module, &right_child.serial_module);
    }

    /// break the matched pairs of interface vertices
    pub fn break_matching_with_mirror(&mut self, dual_module: &mut impl DualModuleImpl) {
        // use `possible_break` to efficiently break those
        let mut possible_break = vec![];
        let module = self.serial_module.read_recursive();
        for node_index in module.possible_break.iter() {
            let primal_node_ptr = module.get_node(*node_index);
            if let Some(primal_node_ptr) = primal_node_ptr {
                let mut primal_node = primal_node_ptr.write();
                if let Some((MatchTarget::VirtualVertex(vertex_index), _)) = &primal_node.temporary_match {
                    if self.partition_info.vertex_to_owning_unit[*vertex_index as usize] == self.unit_index {
                        primal_node.temporary_match = None;
                        self.interface_ptr.set_grow_state(&primal_node.origin.upgrade_force(), DualNodeGrowState::Grow, dual_module);
                    } else {  // still possible break
                        possible_break.push(*node_index);
                    }
                }
            }
        }
        drop(module);
        self.serial_module.write().possible_break = possible_break;
    }

}

impl PrimalModuleImpl for PrimalModuleParallelUnit {

    fn new_empty(_initializer: &SolverInitializer) -> Self {
        panic!("creating parallel unit directly from initializer is forbidden, use `PrimalModuleParallel::new` instead");
    }

    fn clear(&mut self) {
        self.serial_module.clear();
        self.interface_ptr.clear();
    }

    fn load(&mut self, interface_ptr: &DualModuleInterfacePtr) {
        self.serial_module.load(interface_ptr)
    }

    fn load_defect_dual_node(&mut self, dual_node_ptr: &DualNodePtr) {
        self.serial_module.load_defect_dual_node(dual_node_ptr)
    }

    fn resolve<D: DualModuleImpl>(&mut self, group_max_update_length: GroupMaxUpdateLength, interface: &DualModuleInterfacePtr, dual_module: &mut D) {
        self.serial_module.resolve(group_max_update_length, interface, dual_module)
    }

    fn intermediate_matching<D: DualModuleImpl>(&mut self, interface: &DualModuleInterfacePtr, dual_module: &mut D) -> IntermediateMatching {
        self.serial_module.intermediate_matching(interface, dual_module)
    }

}

#[cfg(test)]
pub mod tests {
    use super::*;
    use super::super::example_codes::*;
    use super::super::dual_module_serial::*;

    pub fn primal_module_parallel_basic_standard_syndrome_optional_viz<F>(mut code: impl ExampleCode, visualize_filename: Option<String>
            , mut defect_vertices: Vec<VertexIndex>, final_dual: Weight, partition_func: F, reordered_vertices: Option<Vec<VertexIndex>>)
            -> (PrimalModuleParallel, DualModuleParallel<DualModuleSerial>) where F: Fn(&SolverInitializer, &mut PartitionConfig) {
        println!("{defect_vertices:?}");
        if let Some(reordered_vertices) = &reordered_vertices {
            code.reorder_vertices(reordered_vertices);
            defect_vertices = translated_defect_to_reordered(reordered_vertices, &defect_vertices);
        }
        let mut visualizer = match visualize_filename.as_ref() {
            Some(visualize_filename) => {
                let visualizer = Visualizer::new(Some(visualize_data_folder() + visualize_filename.as_str()), code.get_positions(), true).unwrap();
                print_visualize_link(visualize_filename.clone());
                Some(visualizer)
            }, None => None
        };
        let initializer = code.get_initializer();
        let mut partition_config = PartitionConfig::new(initializer.vertex_num);
        partition_func(&initializer, &mut partition_config);
        let partition_info = partition_config.info();
        let mut dual_module = DualModuleParallel::new_config(&initializer, &partition_info, DualModuleParallelConfig::default());
        let mut primal_config = PrimalModuleParallelConfig::default();
        primal_config.debug_sequential = true;
        let mut primal_module = PrimalModuleParallel::new_config(&initializer, &partition_info, primal_config);
        code.set_defect_vertices(&defect_vertices);
        primal_module.parallel_solve_visualizer(&code.get_syndrome(), &mut dual_module, visualizer.as_mut());
        let useless_interface_ptr = DualModuleInterfacePtr::new_empty();  // don't actually use it
        let perfect_matching = primal_module.perfect_matching(&useless_interface_ptr, &mut dual_module);
        let mut subgraph_builder = SubGraphBuilder::new(&initializer);
        subgraph_builder.load_perfect_matching(&perfect_matching);
        let subgraph = subgraph_builder.get_subgraph();
        if let Some(visualizer) = visualizer.as_mut() {
            let last_interface_ptr = &primal_module.units.last().unwrap().read_recursive().interface_ptr;
            visualizer.snapshot_combined("perfect matching and subgraph".to_string(), vec![last_interface_ptr, &dual_module
                , &perfect_matching, &VisualizeSubgraph::new(&subgraph)]).unwrap();
        }
        let sum_dual_variables = primal_module.units.last().unwrap().read_recursive().interface_ptr.sum_dual_variables();
        assert_eq!(sum_dual_variables, subgraph_builder.total_weight(), "unmatched sum dual variables");
        assert_eq!(sum_dual_variables, final_dual * 2, "unexpected final dual variable sum");
        (primal_module, dual_module)
    }

    pub fn primal_module_parallel_standard_syndrome<F>(code: impl ExampleCode, visualize_filename: String, defect_vertices: Vec<VertexIndex>
            , final_dual: Weight, partition_func: F, reordered_vertices: Option<Vec<VertexIndex>>)
            -> (PrimalModuleParallel, DualModuleParallel<DualModuleSerial>) where F: Fn(&SolverInitializer, &mut PartitionConfig) {
        primal_module_parallel_basic_standard_syndrome_optional_viz(code, Some(visualize_filename), defect_vertices, final_dual, partition_func, reordered_vertices)
    }

    /// test a simple case
    #[test]
    fn primal_module_parallel_basic_1() {  // cargo test primal_module_parallel_basic_1 -- --nocapture
        let visualize_filename = format!("primal_module_parallel_basic_1.json");
        let defect_vertices = vec![39, 52, 63, 90, 100];
        let half_weight = 500;
        primal_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(11, 0.1, half_weight), visualize_filename, defect_vertices, 9 * half_weight, |initializer, _config| {
            println!("initializer: {initializer:?}");
        }, None);
    }

    /// split into 2, with no syndrome vertex on the interface
    #[test]
    fn primal_module_parallel_basic_2() {  // cargo test primal_module_parallel_basic_2 -- --nocapture
        let visualize_filename = format!("primal_module_parallel_basic_2.json");
        let defect_vertices = vec![39, 52, 63, 90, 100];
        let half_weight = 500;
        primal_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(11, 0.1, half_weight), visualize_filename, defect_vertices, 9 * half_weight, |_initializer, config| {
            config.partitions = vec![
                VertexRange::new(0, 72),    // unit 0
                VertexRange::new(84, 132),  // unit 1
            ];
            config.fusions = vec![
                (0, 1),  // unit 2, by fusing 0 and 1
            ];
        }, None);
    }

    /// split into 2, with a syndrome vertex on the interface
    #[test]
    fn primal_module_parallel_basic_3() {  // cargo test primal_module_parallel_basic_3 -- --nocapture
        let visualize_filename = format!("primal_module_parallel_basic_3.json");
        let defect_vertices = vec![39, 52, 63, 90, 100];
        let half_weight = 500;
        primal_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(11, 0.1, half_weight), visualize_filename, defect_vertices, 9 * half_weight, |_initializer, config| {
            config.partitions = vec![
                VertexRange::new(0, 60),    // unit 0
                VertexRange::new(72, 132),  // unit 1
            ];
            config.fusions = vec![
                (0, 1),  // unit 2, by fusing 0 and 1
            ];
        }, None);
    }

    /// split into 4, with no syndrome vertex on the interface
    #[test]
    fn primal_module_parallel_basic_4() {  // cargo test primal_module_parallel_basic_4 -- --nocapture
        let visualize_filename = format!("primal_module_parallel_basic_4.json");
        // reorder vertices to enable the partition;
        let defect_vertices = vec![39, 52, 63, 90, 100];  // indices are before the reorder
        let half_weight = 500;
        primal_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(11, 0.1, half_weight), visualize_filename, defect_vertices, 9 * half_weight, |_initializer, config| {
            config.partitions = vec![
                VertexRange::new(0, 36),
                VertexRange::new(42, 72),
                VertexRange::new(84, 108),
                VertexRange::new(112, 132),
            ];
            config.fusions = vec![
                (0, 1),
                (2, 3),
                (4, 5),
            ];
        }, Some((|| {
            let mut reordered_vertices = vec![];
            let split_horizontal = 6;
            let split_vertical = 5;
            for i in 0..split_horizontal {  // left-top block
                for j in 0..split_vertical {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 11);
            }
            for i in 0..split_horizontal {  // interface between the left-top block and the right-top block
                reordered_vertices.push(i * 12 + split_vertical);
            }
            for i in 0..split_horizontal {  // right-top block
                for j in (split_vertical+1)..10 {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 10);
            }
            {  // the big interface between top and bottom
                for j in 0..12 {
                    reordered_vertices.push(split_horizontal * 12 + j);
                }
            }
            for i in (split_horizontal+1)..11 {  // left-bottom block
                for j in 0..split_vertical {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 11);
            }
            for i in (split_horizontal+1)..11 {  // interface between the left-bottom block and the right-bottom block
                reordered_vertices.push(i * 12 + split_vertical);
            }
            for i in (split_horizontal+1)..11 {  // right-bottom block
                for j in (split_vertical+1)..10 {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 10);
            }
            reordered_vertices
        })()));
    }

    /// split into 4, with 2 defect vertices on parent interfaces
    #[test]
    fn primal_module_parallel_basic_5() {  // cargo test primal_module_parallel_basic_5 -- --nocapture
        let visualize_filename = format!("primal_module_parallel_basic_5.json");
        // reorder vertices to enable the partition;
        let defect_vertices = vec![39, 52, 63, 90, 100];  // indices are before the reorder
        let half_weight = 500;
        primal_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(11, 0.1, half_weight), visualize_filename, defect_vertices, 9 * half_weight, |_initializer, config| {
            config.partitions = vec![
                VertexRange::new(0, 25),
                VertexRange::new(30, 60),
                VertexRange::new(72, 97),
                VertexRange::new(102, 132),
            ];
            config.fusions = vec![
                (0, 1),
                (2, 3),
                (4, 5),
            ];
        }, Some((|| {
            let mut reordered_vertices = vec![];
            let split_horizontal = 5;
            let split_vertical = 4;
            for i in 0..split_horizontal {  // left-top block
                for j in 0..split_vertical {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 11);
            }
            for i in 0..split_horizontal {  // interface between the left-top block and the right-top block
                reordered_vertices.push(i * 12 + split_vertical);
            }
            for i in 0..split_horizontal {  // right-top block
                for j in (split_vertical+1)..10 {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 10);
            }
            {  // the big interface between top and bottom
                for j in 0..12 {
                    reordered_vertices.push(split_horizontal * 12 + j);
                }
            }
            for i in (split_horizontal+1)..11 {  // left-bottom block
                for j in 0..split_vertical {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 11);
            }
            for i in (split_horizontal+1)..11 {  // interface between the left-bottom block and the right-bottom block
                reordered_vertices.push(i * 12 + split_vertical);
            }
            for i in (split_horizontal+1)..11 {  // right-bottom block
                for j in (split_vertical+1)..10 {
                    reordered_vertices.push(i * 12 + j);
                }
                reordered_vertices.push(i * 12 + 10);
            }
            reordered_vertices
        })()));
    }

    fn primal_module_parallel_debug_planar_code_common(d: VertexNum, visualize_filename: String, defect_vertices: Vec<VertexIndex>, final_dual: Weight) {
        let half_weight = 500;
        let split_horizontal = (d + 1) / 2;
        let row_count = d + 1;
        primal_module_parallel_standard_syndrome(CodeCapacityPlanarCode::new(d, 0.1, half_weight), visualize_filename, defect_vertices, final_dual * half_weight, |initializer, config| {
            config.partitions = vec![
                VertexRange::new(0, split_horizontal * row_count),
                VertexRange::new((split_horizontal + 1) * row_count, initializer.vertex_num),
            ];
            config.fusions = vec![
                (0, 1),
            ];
        }, None);
    }

    /// 68000 vs 69000 dual variable: probably missing some interface node
    /// panicked at 'vacating a non-boundary vertex is forbidden', src/dual_module_serial.rs:899:25
    /// reason: when executing sync events, I forgot to add the new propagated dual module to the active list;
    /// why it didn't show up before: because usually a node is created when executing sync event, in which case it's automatically in the active list
    /// if this node already exists before, and it's again synchronized, then it's not in the active list, leading to strange growth 
    #[test]
    fn primal_module_parallel_debug_1() {  // cargo test primal_module_parallel_debug_1 -- --nocapture
        let visualize_filename = format!("primal_module_parallel_debug_1.json");
        let defect_vertices = vec![88, 89, 102, 103, 105, 106, 118, 120, 122, 134, 138];  // indices are before the reorder
        primal_module_parallel_debug_planar_code_common(15, visualize_filename, defect_vertices, 10);
    }

}