1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
namespace bm
{
/// Pad 60 bytes so that the final ocupiles 64 bytes (1 cache line)
struct pad60_struct { char c[60]; };
/// Empty padding
struct pad0_struct { };
/**
Spin-lock with two-phase acquire (read + cas)
padding parameter optionally adds a buffer to avoid CPU cache
line contention.
TODO: test if padding realy helps in our case
*/
template<class Pad = bm::pad0_struct>
class spin_lock
{
public:
explicit spin_lock() {}
/// Lock the lock
void lock() noexcept
{
while(1) // spin loop
{
unsigned locked = locked_.load(std::memory_order_relaxed);
if (!locked &&
locked_.compare_exchange_weak(locked, true,
std::memory_order_acquire))
break;
#if defined(BMSSE2OPT) || defined(BMSSE42OPT) || defined(BMAVX2OPT) || defined(BMAVX512OPT)
_mm_pause();
#endif
} // while
}
/// Unlock the lock
void unlock() noexcept
{
locked_.store(false, std::memory_order_release);
}
private:
spin_lock(const spin_lock&)=delete;
spin_lock& operator=(const spin_lock&)=delete;
private:
std::atomic<unsigned> locked_{false};
Pad p_;
};
/// Wait for multiple threads to exit
///
template<typename TCont>
void join_multiple_threads(TCont& tcont)
{
typename TCont::iterator it_end = tcont.end();
for (typename TCont::iterator it = tcont.begin(); it != it_end; ++it)
{
if (it->joinable())
it->join();
} // for it
}
/// Typedef for a call-back function pointer
///
typedef void* (*func_call_type)(void*);
/// Task descriptor, encapsulates pthread-like callbacks with argument
/// and return
///
struct task_descr
{
func_call_type func; ///< pointer to pthread-like function callback
void* argp; ///< arg pointer
void* ret; ///< ret pointer
task_descr(){}
explicit task_descr(func_call_type f, void* argptr = 0)
: func(f), argp(argptr), ret(0)
{}
};
template<typename QValue, typename Lock> class thread_pool;
/**
Thread sync queue
TODO: use data structures which can guarantee "noexcept" for trivial ops
*/
template<typename Value, typename Lock>
class queue_sync
{
public:
typedef Value value_type;
typedef Lock lock_type;
/// constructor
///
queue_sync(){}
/// Push value to the back of the queue
/// @param v - value to put in the queue
///
/// @sa push_no_lock
///
void push(value_type v)
{
{
std::lock_guard<lock_type> lg(dq_lock_);
data_queue_.push(v);
}
queue_push_cond_.notify_one();
}
/// Push value to the back of the queue without lock protection
/// It is assumed that processing did not start and we are just staging
/// the batch
///
/// @param v - value to put in the queue
/// @sa push
///
void push_no_lock(value_type v) { data_queue_.push(v); }
/// Extract value
/// @param v - [out] value returned
/// @return true if extracted
///
bool try_pop(value_type& v)
{
std::lock_guard<lock_type> guard(dq_lock_);
if (data_queue_.empty())
return false;
v = data_queue_.front();
data_queue_.pop();
return true;
}
/// @return true if empty
bool empty() const
{
std::lock_guard<lock_type> guard(dq_lock_);
return data_queue_.empty();
}
/// lock the queue access
/// @sa push_no_lock, unlock
void lock() { dq_lock_.lock(); }
/// unlock the queue access
/// @sa push_no_lock, lock
void unlock()
{
dq_lock_.unlock();
// lock-unlock is done to protect bulk push, need to wake up
// all waiting workers
queue_push_cond_.notify_all();
}
template<typename QV, typename L> friend class bm::thread_pool;
private:
queue_sync(const queue_sync&) = delete;
queue_sync& operator=(const queue_sync&) = delete;
private:
std::queue<value_type> data_queue_; ///< queue object
mutable lock_type dq_lock_; ///< lock for queue
// signal structure for wait on empty queue
protected:
mutable std::mutex signal_mut_; ///< signal mutex for q submissions
std::condition_variable queue_push_cond_; ///< mutex paired conditional
};
/**
Thread pool with custom (thread safe) queue
QValue - task queue value parameter
Lock - locking protection type (like std::mutex or spinlock)
*/
template<typename QValue, typename Lock>
class thread_pool
{
public:
typedef QValue value_type;
typedef Lock lock_type;
typedef bm::queue_sync<QValue, lock_type> queue_type;
/**
Stop modes for threads:
0 - keep running/waiting for jobs
1 - wait for empty task queue then stop threads
2 - stop threads now even if there are pending tasks
*/
enum stop_mode
{
no_stop = 0, ///< keep spinning on busy-wait
stop_when_done = 1, ///< stop if tsak queue is empty
stop_now = 2 ///< stop right now
};
public:
thread_pool(stop_mode sm = no_stop)
: stop_flag_(sm)
{}
~thread_pool()
{
int is_stop = stop_flag_;
if (!is_stop) // finish the outstanding jobs and close threads
set_stop_mode(stop_when_done);
join();
}
/** Setup the criteria for threads shutdown
Also notifies all threads on a new directive
@param sm - stop mode
*/
void set_stop_mode(stop_mode sm)
{
stop_flag_ = sm;
job_queue_.queue_push_cond_.notify_all();
}
/**
Start thread pool worker threads.
@param tcount - number of threads to start
*/
void start(unsigned tcount)
{
int is_stop = stop_flag_.load(std::memory_order_relaxed);
if (is_stop == stop_now) // immediate stop requested
return;
// TODO: consider lock protect of thread_vect_ member
for(unsigned i = 0;i < tcount; ++i)
{
thread_vect_.emplace_back(
std::thread(&thread_pool::worker_func,this));
} // for
}
/**
Wait for threads to stop
*/
void join()
{
bm::join_multiple_threads(thread_vect_);
thread_vect_.resize(0);
}
/**
Conditional spin-wait for the queue to empty
*/
void wait_empty_queue()
{
const std::chrono::duration<int, std::milli> wait_duration(10);
while(1)
{
if (job_queue_.empty())
break;
std::cv_status wait_res;
{
std::unique_lock<std::mutex> lk(task_done_mut_);
wait_res = task_done_cond_.wait_for(lk, wait_duration);
}
if (wait_res == std::cv_status::timeout)
{
#if defined(BMSSE2OPT) || defined(BMSSE42OPT) || defined(BMAVX2OPT) || defined(BMAVX512OPT)
_mm_pause();
#else
std::this_thread::yield();
#endif
}
} // while
}
/// Get access to the job submission queue
///
queue_type& get_job_queue() noexcept { return job_queue_; }
protected:
/// Internal worker wrapper with busy-wait spin loop
/// making pthread-like call for tasks
///
void worker_func()
{
const std::chrono::duration<int, std::milli> wait_duration(10);
while(1)
{
int is_stop = stop_flag_.load(std::memory_order_relaxed);
if (is_stop == stop_now) // immediate stop requested
break;
task_descr task_descr;
if (job_queue_.try_pop(task_descr))
{
// TODO: consider try-catch here
void* ret = task_descr.func(task_descr.argp);
(void)ret; // TODO: add results processing
task_done_cond_.notify_one();
continue;
}
// queue appears to be empty, check if requested to stop
//
is_stop = stop_flag_.load(std::memory_order_relaxed);
if (is_stop)
return;
// enter a temporal condition wait
// notifications are treated as unreliable re-verified
// via spin over the poll of the queue
std::cv_status wait_res;
{
std::unique_lock<std::mutex> lk(job_queue_.signal_mut_);
wait_res =
job_queue_.queue_push_cond_.wait_for(lk, wait_duration);
}
if (wait_res == std::cv_status::timeout)
{
is_stop = stop_flag_.load(std::memory_order_relaxed);
if (is_stop == stop_now) // immediate stop requested
return;
std::this_thread::yield();
}
} // while
}
private:
thread_pool(const thread_pool&)=delete;
thread_pool& operator=(const thread_pool&)=delete;
private:
queue_type job_queue_; ///< queue (thread sync)
std::vector<std::thread> thread_vect_; ///< threads servicing queue
std::atomic<int> stop_flag_{0}; ///< stop flag to all threads
// notification channel for results wait
mutable std::mutex task_done_mut_; ///< signal mutex for task done
std::condition_variable task_done_cond_;///< mutex paired conditional
};
} // bm