bitcoin_checkqueue/checkqueue.rs
1crate::ix!();
2
3//-------------------------------------------[.cpp/bitcoin/src/checkqueue.h]
4
5/**
6 | Queue for verifications that have to
7 | be performed.
8 |
9 | The verifications are represented
10 | by a type T, which must provide an operator(),
11 | returning a bool.
12 |
13 | One thread (the master) is assumed to
14 | push batches of verifications onto
15 | the queue, where they are processed
16 | by N-1 worker threads. When the master
17 | is done adding work, it temporarily
18 | joins the worker pool as an N'th worker,
19 | until all jobs are done.
20 |
21 */
22pub struct CheckQueue<T> {
23
24 /**
25 | Mutex to protect the inner state
26 |
27 */
28 mutex_: std::sync::Mutex<CheckQueueInner<T>>,
29
30 /**
31 | Worker threads block on this when out
32 | of work
33 |
34 */
35 worker_cv: std::sync::Condvar,
36
37 /**
38 | Master thread blocks on this when out
39 | of work
40 |
41 */
42 master_cv: std::sync::Condvar,
43
44 /**
45 | The maximum number of elements to be
46 | processed in one batch
47 |
48 */
49 n_batch_size: u32,
50
51 worker_threads: Vec<Thread>,
52
53 /**
54 | Mutex to ensure only one concurrent
55 | CCheckQueueControl
56 |
57 */
58 control_mutex: RawMutex,
59}
60
61impl<T> Drop for CheckQueue<T> {
62
63 fn drop(&mut self) {
64 todo!();
65 /*
66 assert(m_worker_threads.empty());
67 */
68 }
69}
70
71impl<T> CheckQueue<T> {
72
73 /**
74 | Internal function that does bulk of
75 | the verification work.
76 |
77 */
78 pub fn loop_(&mut self, master: bool) -> bool {
79
80 todo!();
81 /*
82 std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
83 std::vector<T> vChecks;
84 vChecks.reserve(nBatchSize);
85 unsigned int nNow = 0;
86 bool fOk = true;
87 do {
88 {
89 WAIT_LOCK(m_mutex, lock);
90 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
91 if (nNow) {
92 fAllOk &= fOk;
93 nTodo -= nNow;
94 if (nTodo == 0 && !fMaster)
95 // We processed the last element; inform the master it can exit and return the result
96 m_master_cv.notify_one();
97 } else {
98 // first iteration
99 nTotal++;
100 }
101 // logically, the do loop starts here
102 while (queue.empty() && !m_request_stop) {
103 if (fMaster && nTodo == 0) {
104 nTotal--;
105 bool fRet = fAllOk;
106 // reset the status for new work later
107 fAllOk = true;
108 // return the current status
109 return fRet;
110 }
111 nIdle++;
112 cond.wait(lock); // wait
113 nIdle--;
114 }
115 if (m_request_stop) {
116 return false;
117 }
118
119 // Decide how many work units to process now.
120 // * Do not try to do everything at once, but aim for increasingly smaller batches so
121 // all workers finish approximately simultaneously.
122 // * Try to account for idle jobs which will instantly start helping.
123 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
124 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
125 vChecks.resize(nNow);
126 for (unsigned int i = 0; i < nNow; i++) {
127 // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
128 // queue to the local batch vector instead of copying.
129 vChecks[i].swap(queue.back());
130 queue.pop_back();
131 }
132 // Check whether we need to do work at all
133 fOk = fAllOk;
134 }
135 // execute work
136 for (T& check : vChecks)
137 if (fOk)
138 fOk = check();
139 vChecks.clear();
140 } while (true);
141 */
142 }
143
144 /**
145 | Create a new check queue
146 |
147 */
148 pub fn new(n_batch_size_in: u32) -> Self {
149
150 todo!();
151 /*
152 : n_batch_size(nBatchSizeIn),
153
154
155 */
156 }
157
158 /**
159 | Create a pool of new worker threads.
160 |
161 */
162 pub fn start_worker_threads(&mut self, threads_num: i32) {
163
164 todo!();
165 /*
166 {
167 LOCK(m_mutex);
168 nIdle = 0;
169 nTotal = 0;
170 fAllOk = true;
171 }
172
173 assert(m_worker_threads.empty());
174
175 for (int n = 0; n < threads_num; ++n) {
176
177 m_worker_threads.emplace_back([this, n]() {
178 util::ThreadRename(strprintf("scriptch.%i", n));
179 SetSyscallSandboxPolicy(SyscallSandboxPolicy::VALIDATION_SCRIPT_CHECK);
180
181 /* worker thread */
182 Loop(false );
183 });
184 }
185 */
186 }
187
188 /**
189 | Wait until execution finishes, and
190 | return whether all evaluations were
191 | successful.
192 |
193 */
194 pub fn wait(&mut self) -> bool {
195
196 todo!();
197 /*
198 /* master thread */
199 return Loop(true );
200 */
201 }
202
203 /**
204 | Add a batch of checks to the queue
205 |
206 */
207 pub fn add(&mut self, checks: &mut Vec<T>) {
208
209 todo!();
210 /*
211 LOCK(m_mutex);
212 for (T& check : vChecks) {
213 queue.push_back(T());
214 check.swap(queue.back());
215 }
216 nTodo += vChecks.size();
217 if (vChecks.size() == 1)
218 m_worker_cv.notify_one();
219 else if (vChecks.size() > 1)
220 m_worker_cv.notify_all();
221 */
222 }
223
224 /**
225 | Stop all of the worker threads.
226 |
227 */
228 pub fn stop_worker_threads(&mut self) {
229
230 todo!();
231 /*
232
233 [&]() { LOCK(m_mutex); m_request_stop = true }()
234 ;
235 m_worker_cv.notify_all();
236 for (std::thread& t : m_worker_threads) {
237 t.join();
238 }
239 m_worker_threads.clear();
240
241 [&]() { LOCK(m_mutex); m_request_stop = false }()
242 ;
243 */
244 }
245}