1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
#ifndef BMTPOOL__H__INCLUDED__
#define BMTPOOL__H__INCLUDED__
/*
Copyright(c) 2002-2021 Anatoliy Kuznetsov(anatoliy_kuznetsov at yahoo.com)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
For more information please visit: http://bitmagic.io
*/
#include <type_traits>
#include <queue>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include "bmbuffer.h"
#include "bmtask.h"
namespace bm
{
/// Pad 60 bytes so that the final ocupiles 64 bytes (1 cache line)
/// @internal
struct pad60_struct { char c[60]; };
/// Empty padding
/// @internal
struct pad0_struct { };
/**
Spin-lock with two-phase acquire (read + cas)
padding parameter optionally adds a buffer to avoid CPU cache
line contention.
TODO: test if padding realy helps in our case
Generally spin_lock does not have advantage over std::mutex
but in some specific cases like WebAssembly it may be better
due no "noexcept" property
@ingroup bmtasks
*/
template<class Pad = bm::pad0_struct>
class spin_lock
{
public:
spin_lock() noexcept : locked_(0) {}
/// Lock the lock
void lock() noexcept
{
while(1) // spin loop
{
unsigned locked = locked_.load(std::memory_order_relaxed);
if (!locked &&
locked_.compare_exchange_weak(locked, true,
std::memory_order_acquire))
break;
#if defined(BMSSE2OPT) || defined(BMSSE42OPT) || defined(BMAVX2OPT) || defined(BMAVX512OPT)
_mm_pause();
#endif
} // while
}
/// Try to acquire the lock, return true if successfull
///
bool try_lock() noexcept
{
unsigned locked = locked_.load(std::memory_order_relaxed);
if (!locked &&
locked_.compare_exchange_weak(locked, true,
std::memory_order_acquire))
return true;
return false;
}
/// Unlock the lock
void unlock() noexcept
{
locked_.store(false, std::memory_order_release);
}
private:
spin_lock(const spin_lock&)=delete;
spin_lock& operator=(const spin_lock&)=delete;
private:
std::atomic<unsigned> locked_;
Pad p_;
};
/// Wait for multiple threads to exit
///
/// @internal
/// @ingroup bmtasks
///
template<typename TCont>
void join_multiple_threads(TCont& tcont)
{
typename TCont::iterator it_end = tcont.end();
for (typename TCont::iterator it = tcont.begin(); it != it_end; ++it)
{
if (it->joinable())
it->join();
} // for it
}
template<typename QValue, typename Lock> class thread_pool;
/**
Thread-sync queue with MT access protecion
@ingroup bmtasks
@internal
*/
template<typename Value, typename Lock>
class queue_sync
{
public:
typedef Value value_type;
typedef Lock lock_type;
/// constructor
///
queue_sync() noexcept {}
/// Push value to the back of the queue
/// @param v - value to put in the queue
///
/// @sa push_no_lock
///
void push(const value_type& v) //noexcept(bm::is_lock_noexcept<lock_type>::value)
{
{
std::lock_guard<lock_type> lg(dq_lock_);
data_queue_.push(v);
}
queue_push_cond_.notify_one(); // noexcept
}
/// Push value to the back of the queue without lock protection
/// It is assumed that processing did not start and we are just staging
/// the batch
///
/// @param v - value to put in the queue
/// @sa push
///
void push_no_lock(const value_type& v)
{
data_queue_.push(v);
}
/// Extract value
/// @param v - [out] value returned
/// @return true if extracted
///
bool try_pop(value_type& v)
{
std::lock_guard<lock_type> guard(dq_lock_);
if (data_queue_.empty())
return false;
v = data_queue_.front();
data_queue_.pop();
return true;
}
/// @return true if empty
bool empty() const //noexcept(bm::is_lock_noexcept<lock_type>::value)
{
std::lock_guard<lock_type> guard(dq_lock_);
return data_queue_.empty();
}
/// lock the queue access
/// @sa push_no_lock, unlock
void lock() noexcept(bm::is_lock_noexcept<lock_type>::value)
{ dq_lock_.lock(); }
/// Try to lock the queue exclusively
///
bool try_lock() noexcept(bm::is_lock_noexcept<lock_type>::value)
{ return dq_lock_.try_lock(); }
/// unlock the queue access
/// @sa push_no_lock, lock
void unlock() noexcept(bm::is_lock_noexcept<lock_type>::value)
{
dq_lock_.unlock();
// lock-unlock is done to protect bulk push, need to wake up
// all waiting workers
queue_push_cond_.notify_all(); // noexcept
}
template<typename QV, typename L> friend class bm::thread_pool;
protected:
typedef std::queue<value_type> queue_type;
private:
queue_sync(const queue_sync&) = delete;
queue_sync& operator=(const queue_sync&) = delete;
private:
queue_type data_queue_; ///< queue object
mutable lock_type dq_lock_; ///< lock for queue
// signal structure for wait on empty queue
protected:
mutable std::mutex signal_mut_; ///< signal mutex for q submissions
std::condition_variable queue_push_cond_; ///< mutex paired conditional
};
/**
Thread pool with custom (thread safe) queue
Thread pool implements a busy-wait task stealing
design pattern
QValue - task queue value parameter
Lock - locking protection type (like std::mutex or spinlock)
@ingroup bmtasks
*/
template<typename QValue, typename Lock>
class thread_pool
{
public:
typedef QValue value_type;
typedef Lock lock_type;
typedef bm::queue_sync<QValue, lock_type> queue_type;
/**
Stop modes for threads:
0 - keep running/waiting for jobs
1 - wait for empty task queue then stop threads
2 - stop threads now even if there are pending tasks
*/
enum stop_mode
{
no_stop = 0, ///< keep spinning on busy-wait
stop_when_done = 1, ///< stop if tsak queue is empty
stop_now = 2 ///< stop right now
};
public:
thread_pool(stop_mode sm = no_stop) noexcept
: stop_flag_(sm)
{}
~thread_pool();
/** Setup the criteria for threads shutdown
Also notifies all threads on a new directive
@param sm - stop mode
*/
void set_stop_mode(stop_mode sm) noexcept;
/**
Request an immediate stop of all threads in the pool
*/
void stop() noexcept { set_stop_mode(stop_now); }
/**
Start thread pool worker threads.
@param tcount - number of threads to start
*/
void start(unsigned tcount);
/**
Wait for threads to finish (or stop if stop was requested)
*/
void join();
/**
Conditional spin-wait for the queue to empty
(Important note: tasks may still be running, but the queue is empty)
*/
void wait_empty_queue();
/// Get access to the job submission queue
///
queue_type& get_job_queue() noexcept { return job_queue_; }
/// Return if thread pool is stopped by a request
int is_stopped() const noexcept
{ return stop_flag_.load(std::memory_order_relaxed); }
protected:
/// Internal worker wrapper with busy-wait spin loop
/// making pthread-like call for tasks
///
void worker_func();
private:
thread_pool(const thread_pool&)=delete;
thread_pool& operator=(const thread_pool&)=delete;
private:
queue_type job_queue_; ///< queue (thread sync)
std::vector<std::thread> thread_vect_; ///< threads servicing queue
std::atomic_int stop_flag_{0}; ///< stop flag to all threads
// notification channel for results wait
mutable std::mutex task_done_mut_; ///< signal mutex for task done
std::condition_variable task_done_cond_;///< mutex paired conditional
};
/**
Utility class to submit task batch to the running thread pool
and optionally wait for it getting done
@ingroup bmtasks
*/
template<typename TPool>
class thread_pool_executor
{
public:
typedef TPool thread_pool_type;
typedef task_batch_base::size_type size_type;
public:
thread_pool_executor() {}
static
void run(thread_pool_type& tpool,
bm::task_batch_base& tasks,
bool wait_for_batch);
/**
Check if all batch jobs in the specified interval are done
Spin wait if not.
*/
static
void wait_for_batch_done(thread_pool_type& tpool,
bm::task_batch_base& tasks,
task_batch_base::size_type from_idx,
task_batch_base::size_type to_idx);
private:
thread_pool_executor(const thread_pool_executor&) = delete;
thread_pool_executor& operator=(const thread_pool_executor&) = delete;
};
// ========================================================================
// thread_pool<> implementations
// ========================================================================
// -----------------------------------------------------------------------
template<typename QValue, typename Lock>
thread_pool<QValue, Lock>::~thread_pool()
{
int is_stop = stop_flag_;
if (!is_stop) // finish the outstanding jobs and close threads
set_stop_mode(stop_when_done);
join();
}
// -----------------------------------------------------------------------
template<typename QValue, typename Lock>
void thread_pool<QValue, Lock>::set_stop_mode(stop_mode sm) noexcept
{
stop_flag_ = sm;
job_queue_.queue_push_cond_.notify_all(); // this is noexcept
}
// -----------------------------------------------------------------------
template<typename QValue, typename Lock>
void thread_pool<QValue, Lock>::start(unsigned tcount)
{
int is_stop = stop_flag_.load(std::memory_order_relaxed);
if (is_stop == stop_now) // immediate stop requested
return;
// TODO: consider lock protect of thread_vect_ member
for(unsigned i = 0;i < tcount; ++i)
{
thread_vect_.emplace_back(
std::thread(&thread_pool::worker_func,this));
} // for
}
// -----------------------------------------------------------------------
template<typename QValue, typename Lock>
void thread_pool<QValue, Lock>::join()
{
bm::join_multiple_threads(thread_vect_);
thread_vect_.resize(0);
}
// -----------------------------------------------------------------------
template<typename QValue, typename Lock>
void thread_pool<QValue, Lock>::wait_empty_queue()
{
const std::chrono::duration<int, std::milli> wait_duration(20);
while(1)
{
if (job_queue_.empty())
break;
std::cv_status wait_res;
{
std::unique_lock<std::mutex> lk(task_done_mut_);
wait_res = task_done_cond_.wait_for(lk, wait_duration);
}
if (wait_res == std::cv_status::timeout)
{
std::this_thread::yield();
int is_stop = is_stopped();
if (is_stop == stop_now) // immediate stop requested
return;
}
} // while
}
// -----------------------------------------------------------------------
template<typename QValue, typename Lock>
void thread_pool<QValue, Lock>::worker_func()
{
const std::chrono::duration<int, std::milli> wait_duration(10);
while(1)
{
int is_stop = is_stopped();
if (is_stop == stop_now) // immediate stop requested
break;
bm::task_descr* task_descr;
if (job_queue_.try_pop(task_descr))
{
BM_ASSERT(task_descr->done == 0);
try
{
task_descr->err_code = task_descr->func(task_descr->argp);
}
catch (...)
{
task_descr->err_code = -1;
}
task_descr->done.store(1, std::memory_order_release);
task_done_cond_.notify_one();
continue;
}
// queue appears to be empty, check if requested to stop
//
is_stop = is_stopped();
if (is_stop)
return;
// enter a temporal condition wait
// notifications are treated as unreliable re-verified
// via spin over the poll of the queue
std::cv_status wait_res;
{
std::unique_lock<std::mutex> lk(job_queue_.signal_mut_);
wait_res =
job_queue_.queue_push_cond_.wait_for(lk, wait_duration);
}
if (wait_res == std::cv_status::timeout)
{
is_stop = is_stopped();
if (is_stop == stop_now) // immediate stop requested
return;
std::this_thread::yield();
}
} // while
return;
}
// ========================================================================
// thread_pool_executor<> implementations
// ========================================================================
template<typename TPool>
void thread_pool_executor<TPool>::run(
thread_pool_type& tpool,
bm::task_batch_base& task_batch,
bool wait_for_batch)
{
typename thread_pool_type::queue_type& qu = tpool.get_job_queue();
task_batch_base::size_type batch_size = task_batch.size();
for (task_batch_base::size_type i = 0; i < batch_size; ++i)
{
bm::task_descr* tdescr = task_batch.get_task(i);
tdescr->argp = tdescr; // restore the self referenece
BM_ASSERT(!tdescr->done);
if (tdescr->flags != bm::task_descr::no_flag) // barrier task ?
{
if (i) // wait until all previously scheduled tasks are done
{
tpool.wait_empty_queue();
wait_for_batch_done(tpool, task_batch, 0, batch_size - 1);
}
// run the barrier proc on the curent thread
tdescr->err_code = tdescr->func(tdescr->argp);
tdescr->done.store(1, std::memory_order_release);
// re-read the batch size, if barrier added more tasks
task_batch_base::size_type new_batch_size = task_batch.size();
if (new_batch_size != batch_size)
batch_size = new_batch_size;
continue;
}
qu.push(tdescr); // locked push to the thread queue
auto is_stop = tpool.is_stopped();
if (is_stop == thread_pool_type::stop_now)
break; // thread pool stop requested
} // for
// implicit wait barrier for all tasks
if (wait_for_batch && batch_size)
{
tpool.wait_empty_queue();
wait_for_batch_done(tpool, task_batch, 0, batch_size - 1);
}
}
// -----------------------------------------------------------------------
template<typename TPool>
void thread_pool_executor<TPool>::wait_for_batch_done(
thread_pool_type& tpool,
bm::task_batch_base& tasks,
task_batch_base::size_type from_idx,
task_batch_base::size_type to_idx)
{
BM_ASSERT(from_idx <= to_idx);
BM_ASSERT(to_idx < tasks.size());
for (task_batch_base::size_type i = from_idx; i <= to_idx; ++i)
{
const bm::task_descr* tdescr = tasks.get_task(i);
auto done = tdescr->done.load(std::memory_order_consume);
while (!done)
{
auto is_stop = tpool.is_stopped();
if (is_stop == thread_pool_type::stop_now)
return; // thread pool stopped, jobs will not be done
std::this_thread::yield();
// TODO: subscribe to a conditional wait for job done in tpool
done = tdescr->done.load(std::memory_order_acquire);
} // while
} // for
}
// -----------------------------------------------------------------------
} // bm
#endif