1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*******************************************************************************
* tests/thread_pool_test.cpp
*
* Part of tlx - http://panthema.net/tlx
*
* Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
*
* All rights reserved. Published under the Boost Software License, Version 1.0
******************************************************************************/
// this makes sleep_for() available in older GCC versions
#define _GLIBCXX_USE_NANOSLEEP
#include <numeric>
#include <string>
#include <vector>
#include <tlx/die.hpp>
#include <tlx/thread_pool.hpp>
void test_loop_until_empty() {
size_t job_num = 256;
std::vector<size_t> result1(job_num, 0), result2(job_num, 0);
{
tlx::ThreadPool pool(8);
for (size_t r = 0; r != 16; ++r) {
for (size_t i = 0; i != job_num; ++i) {
pool.enqueue(
[i, &result1, &result2, &pool]() {
// set flag
result1[i] = 1 + i;
// enqueue more work.
pool.enqueue(
[i, &result2]() {
result2[i] = 2 + i;
});
});
}
pool.loop_until_empty();
}
}
// check that the threads have run
for (size_t i = 0; i != job_num; ++i) {
die_unequal(result1[i], 1 + i);
die_unequal(result2[i], 2 + i);
}
}
void test_loop_until_terminate(size_t sleep_msec) {
size_t job_num = 256;
std::vector<int> result1(job_num, 0), result2(job_num, 0);
std::chrono::milliseconds sleep_time(sleep_msec);
tlx::ThreadPool pool(8);
for (size_t i = 0; i != job_num; ++i) {
pool.enqueue(
[i, &result1, &result2, &pool, &sleep_time]() {
// set flag
result1[i] = 1;
std::this_thread::sleep_for(sleep_time);
// enqueue more work: how to call this lambda again?
pool.enqueue(
[i, &result2, &sleep_time]() {
result2[i] = 1;
std::this_thread::sleep_for(sleep_time);
});
});
}
using steady_clock = std::chrono::steady_clock;
steady_clock::time_point tp_start = steady_clock::now();
// start thread which will stop the thread pool (if we would enqueue this as
// job, it would be no different from the first test).
std::thread stopper_thr = std::thread(
[&pool]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
pool.terminate();
});
pool.loop_until_terminate();
stopper_thr.join();
// check that it didn't terminate immediately.
auto tp_delta = std::chrono::duration_cast<std::chrono::milliseconds>(
steady_clock::now() - tp_start);
die_unless(tp_delta.count() >= 90);
// check result: count number of flags set.
size_t sum = std::accumulate(result1.begin(), result1.end(), 0);
sum += std::accumulate(result2.begin(), result2.end(), 0);
die_unequal(sum, pool.done());
}
void test_init_thread() {
std::atomic<size_t> count { 0 };
{
tlx::ThreadPool pool(
/* num_threads */ 8,
/* thread initializer */
[&](size_t i) { count += i; });
pool.loop_until_empty();
}
die_unequal(count.load(), (7 * 8) / 2u);
}
int main() {
test_loop_until_empty();
for (size_t i = 0; i < 10; ++i)
test_loop_until_terminate(i);
test_init_thread();
return 0;
}
/******************************************************************************/