bitcoin_checkqueue/
checkqueue.rs

1crate::ix!();
2
3//-------------------------------------------[.cpp/bitcoin/src/checkqueue.h]
4
5/**
6  | Queue for verifications that have to
7  | be performed.
8  | 
9  | The verifications are represented
10  | by a type T, which must provide an operator(),
11  | returning a bool.
12  | 
13  | One thread (the master) is assumed to
14  | push batches of verifications onto
15  | the queue, where they are processed
16  | by N-1 worker threads. When the master
17  | is done adding work, it temporarily
18  | joins the worker pool as an N'th worker,
19  | until all jobs are done.
20  |
21  */
22pub struct CheckQueue<T> {
23
24    /**
25      | Mutex to protect the inner state
26      |
27      */
28    mutex_:     std::sync::Mutex<CheckQueueInner<T>>,
29
30    /**
31      | Worker threads block on this when out
32      | of work
33      |
34      */
35    worker_cv: std::sync::Condvar,
36
37    /**
38      | Master thread blocks on this when out
39      | of work
40      |
41      */
42    master_cv: std::sync::Condvar,
43
44    /**
45      | The maximum number of elements to be
46      | processed in one batch
47      |
48      */
49    n_batch_size: u32,
50
51    worker_threads: Vec<Thread>,
52
53    /**
54      | Mutex to ensure only one concurrent
55      | CCheckQueueControl
56      |
57      */
58    control_mutex: RawMutex,
59}
60
61impl<T> Drop for CheckQueue<T> {
62
63    fn drop(&mut self) {
64        todo!();
65        /*
66            assert(m_worker_threads.empty());
67        */
68    }
69}
70
71impl<T> CheckQueue<T> {
72
73    /**
74      | Internal function that does bulk of
75      | the verification work.
76      |
77      */
78    pub fn loop_(&mut self, master: bool) -> bool {
79        
80        todo!();
81        /*
82            std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
83            std::vector<T> vChecks;
84            vChecks.reserve(nBatchSize);
85            unsigned int nNow = 0;
86            bool fOk = true;
87            do {
88                {
89                    WAIT_LOCK(m_mutex, lock);
90                    // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
91                    if (nNow) {
92                        fAllOk &= fOk;
93                        nTodo -= nNow;
94                        if (nTodo == 0 && !fMaster)
95                            // We processed the last element; inform the master it can exit and return the result
96                            m_master_cv.notify_one();
97                    } else {
98                        // first iteration
99                        nTotal++;
100                    }
101                    // logically, the do loop starts here
102                    while (queue.empty() && !m_request_stop) {
103                        if (fMaster && nTodo == 0) {
104                            nTotal--;
105                            bool fRet = fAllOk;
106                            // reset the status for new work later
107                            fAllOk = true;
108                            // return the current status
109                            return fRet;
110                        }
111                        nIdle++;
112                        cond.wait(lock); // wait
113                        nIdle--;
114                    }
115                    if (m_request_stop) {
116                        return false;
117                    }
118
119                    // Decide how many work units to process now.
120                    // * Do not try to do everything at once, but aim for increasingly smaller batches so
121                    //   all workers finish approximately simultaneously.
122                    // * Try to account for idle jobs which will instantly start helping.
123                    // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
124                    nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
125                    vChecks.resize(nNow);
126                    for (unsigned int i = 0; i < nNow; i++) {
127                        // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
128                        // queue to the local batch vector instead of copying.
129                        vChecks[i].swap(queue.back());
130                        queue.pop_back();
131                    }
132                    // Check whether we need to do work at all
133                    fOk = fAllOk;
134                }
135                // execute work
136                for (T& check : vChecks)
137                    if (fOk)
138                        fOk = check();
139                vChecks.clear();
140            } while (true);
141        */
142    }
143
144    /**
145      | Create a new check queue
146      |
147      */
148    pub fn new(n_batch_size_in: u32) -> Self {
149    
150        todo!();
151        /*
152        : n_batch_size(nBatchSizeIn),
153
154        
155        */
156    }
157
158    /**
159      | Create a pool of new worker threads.
160      |
161      */
162    pub fn start_worker_threads(&mut self, threads_num: i32)  {
163        
164        todo!();
165        /*
166            {
167                LOCK(m_mutex);
168                nIdle = 0;
169                nTotal = 0;
170                fAllOk = true;
171            }
172
173            assert(m_worker_threads.empty());
174
175            for (int n = 0; n < threads_num; ++n) {
176
177                m_worker_threads.emplace_back([this, n]() {
178                    util::ThreadRename(strprintf("scriptch.%i", n));
179                    SetSyscallSandboxPolicy(SyscallSandboxPolicy::VALIDATION_SCRIPT_CHECK);
180
181                    /* worker thread */
182                    Loop(false );
183                });
184            }
185        */
186    }
187
188    /**
189      | Wait until execution finishes, and
190      | return whether all evaluations were
191      | successful.
192      |
193      */
194    pub fn wait(&mut self) -> bool {
195        
196        todo!();
197        /*
198            /* master thread */
199            return Loop(true );
200        */
201    }
202
203    /**
204      | Add a batch of checks to the queue
205      |
206      */
207    pub fn add(&mut self, checks: &mut Vec<T>)  {
208        
209        todo!();
210        /*
211            LOCK(m_mutex);
212            for (T& check : vChecks) {
213                queue.push_back(T());
214                check.swap(queue.back());
215            }
216            nTodo += vChecks.size();
217            if (vChecks.size() == 1)
218                m_worker_cv.notify_one();
219            else if (vChecks.size() > 1)
220                m_worker_cv.notify_all();
221        */
222    }
223
224    /**
225      | Stop all of the worker threads.
226      |
227      */
228    pub fn stop_worker_threads(&mut self)  {
229        
230        todo!();
231        /*
232            
233        [&]() { LOCK(m_mutex);  m_request_stop = true }()
234        ;
235            m_worker_cv.notify_all();
236            for (std::thread& t : m_worker_threads) {
237                t.join();
238            }
239            m_worker_threads.clear();
240            
241        [&]() { LOCK(m_mutex);  m_request_stop = false }()
242        ;
243        */
244    }
245}