1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
use crate::lamellae::{AllocationType, Lamellae};
use crate::lamellar_arch::LamellarArchRT;
use crate::memregion::MemoryRegion; use crate::scheduler::{Scheduler, SchedulerQueue};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;
pub(crate) struct Barrier {
my_pe: usize, pub(crate) arch: Arc<LamellarArchRT>,
pub(crate) scheduler: Arc<Scheduler>,
barrier_cnt: AtomicUsize,
barrier_buf: Option<SubBufs>,
}
struct SubBufs {
barrier1: MemoryRegion<usize>,
barrier2: MemoryRegion<usize>,
barrier3: MemoryRegion<usize>,
}
impl Barrier {
pub(crate) fn new(
my_pe: usize,
global_pes: usize,
lamellae: Arc<Lamellae>,
arch: Arc<LamellarArchRT>,
scheduler: Arc<Scheduler>,
) -> Barrier {
let bufs = if let Ok(_my_index) = arch.team_pe(my_pe) {
let num_pes = arch.num_pes;
if num_pes > 1 {
let alloc = if global_pes == arch.num_pes {
AllocationType::Global
} else {
let mut pes = arch.team_iter().collect::<Vec<usize>>();
pes.sort();
AllocationType::Sub(pes)
};
let barrier1 = MemoryRegion::new(num_pes, lamellae.clone(), alloc.clone());
let barrier2 = MemoryRegion::new(num_pes, lamellae.clone(), alloc.clone());
let barrier3 = MemoryRegion::new(3, lamellae.clone(), alloc);
unsafe {
for elem in barrier1.as_mut_slice().unwrap() {
*elem = 0;
}
for elem in barrier2.as_mut_slice().unwrap() {
*elem = 0;
}
for elem in barrier3.as_mut_slice().unwrap() {
*elem = 0;
}
}
Some(SubBufs {
barrier1: barrier1,
barrier2: barrier2,
barrier3: barrier3,
})
} else {
None
}
} else {
None
};
let bar = Barrier {
my_pe: my_pe,
arch: arch,
scheduler: scheduler,
barrier_cnt: AtomicUsize::new(0),
barrier_buf: bufs,
};
bar
}
fn print_bar(&self) {
if let Some(bufs) = &self.barrier_buf {
println!(
"[{:?}] [LAMELLAR BARRIER] {:?} {:?} {:?}",
self.my_pe,
bufs.barrier1.as_slice(),
bufs.barrier2.as_slice(),
bufs.barrier3.as_slice()
);
}
}
fn check_barrier_vals(&self, barrier_id: usize, barrier_buf: &MemoryRegion<usize>) {
let mut s = Instant::now();
for pe in barrier_buf.as_slice().unwrap() {
while *pe != barrier_id {
self.scheduler.exec_task();
if s.elapsed().as_secs_f64() > *crate::DEADLOCK_TIMEOUT {
println!("[WARNING] Potential deadlock detected.\n\
Barrier is a collective operation requiring all PEs associated with the distributed object to enter the barrier call.\n\
Please refer to https://docs.rs/lamellar/latest/lamellar/index.html?search=barrier for more information\n\
Note that barriers are often called internally for many collective operations, including constructing new LamellarTeams, LamellarArrays, and Darcs, as well as distributed iteration\n\
A full list of collective operations is found at https://docs.rs/lamellar/latest/lamellar/index.html?search=collective\n\
The deadlock timeout can be set via the LAMELLAR_DEADLOCK_TIMEOUT environment variable, the current timeout is {} seconds\n\
To view backtrace set RUST_LIB_BACKTRACE=1\n\
{}",*crate::DEADLOCK_TIMEOUT,std::backtrace::Backtrace::capture());
self.print_bar();
s = Instant::now();
}
}
}
}
fn put_barrier_val(
&self,
my_index: usize,
barrier_id: &[usize],
barrier_buf: &MemoryRegion<usize>,
) {
for world_pe in self.arch.team_iter() {
unsafe {
barrier_buf.put_slice(world_pe, my_index, barrier_id);
}
}
}
pub(crate) fn barrier(&self) {
if let Some(bufs) = &self.barrier_buf {
if let Ok(my_index) = self.arch.team_pe(self.my_pe) {
let mut barrier_id = self.barrier_cnt.fetch_add(1, Ordering::SeqCst);
self.check_barrier_vals(barrier_id, &bufs.barrier2);
barrier_id += 1;
let barrier3_slice = unsafe { bufs.barrier3.as_mut_slice().unwrap() };
barrier3_slice[0] = barrier_id;
let barrier_slice = &[barrier_id];
self.put_barrier_val(my_index, barrier_slice, &bufs.barrier1);
self.check_barrier_vals(barrier_id, &bufs.barrier1);
barrier3_slice[1] = barrier_id;
let barrier_slice = &barrier3_slice[1..2];
self.put_barrier_val(my_index, barrier_slice, &bufs.barrier2);
}
}
}
}