bitcoin_scheduler/scheduler.rs
1crate::ix!();
2
3pub fn repeat(
4 s: &mut Scheduler,
5 f: SchedulerFunction,
6 delta: Duration /* millis */) {
7
8 todo!();
9 /*
10 f();
11 s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
12 */
13}
14
15//-------------------------------------------[.cpp/bitcoin/src/scheduler.h]
16//-------------------------------------------[.cpp/bitcoin/src/scheduler.cpp]
17
18/**
19 | Simple class for background tasks that
20 | should be run periodically or once "after
21 | a while"
22 |
23 | Usage:
24 |
25 | -----------
26 | @code
27 |
28 | CScheduler* s = new CScheduler();
29 | s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: c_void doSomething() { }
30 | s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3});
31 | std::thread* t = new std::thread([&] { s->serviceQueue(); });
32 |
33 | ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
34 | s->stop();
35 | t->join();
36 | delete t;
37 | delete s; // Must be done after thread is interrupted/joined.
38 |
39 */
40pub struct Scheduler {
41 service_thread: Thread,
42 new_task_mutex: RefCell<Mutex<SchedulerInner>>,
43 new_task_scheduled: Condvar,
44}
45
46pub struct SchedulerInner {
47 task_queue: MultiMap<TimePoint,SchedulerFunction>,
48 n_threads_servicing_queue: i32, // default = { 0 }
49 stop_requested: bool, // default = { false }
50 stop_when_empty: bool, // default = { false }
51}
52
53pub type SchedulerFunction = Box<dyn FnMut() -> ()>;
54
55impl Drop for Scheduler {
56 fn drop(&mut self) {
57 todo!();
58 /*
59 assert(nThreadsServicingQueue == 0);
60 if (stopWhenEmpty) assert(taskQueue.empty());
61 */
62 }
63}
64
65impl Scheduler {
66
67 /**
68 | Call f once after the delta has passed
69 |
70 */
71 pub fn schedule_from_now(&mut self,
72 f: SchedulerFunction,
73 delta: Duration /* millis */) {
74
75 todo!();
76 /*
77 schedule(std::move(f), std::chrono::system_clock::now() + delta);
78 */
79 }
80
81 /**
82 | Tell any threads running serviceQueue
83 | to stop as soon as the current task is
84 | done
85 |
86 */
87 pub fn stop(&mut self) {
88
89 todo!();
90 /*
91
92 [&]() { LOCK(newTaskMutex); stopRequested = true }()
93 ;
94 newTaskScheduled.notify_all();
95 if (m_service_thread.joinable()) m_service_thread.join();
96 */
97 }
98
99 /**
100 | Tell any threads running serviceQueue
101 | to stop when there is no work left to be
102 | done
103 |
104 */
105 pub fn stop_when_drained(&mut self) {
106
107 todo!();
108 /*
109
110 [&]() { LOCK(newTaskMutex); stopWhenEmpty = true }()
111 ;
112 newTaskScheduled.notify_all();
113 if (m_service_thread.joinable()) m_service_thread.join();
114 */
115 }
116
117 #[EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)]
118 pub fn should_stop(&self) -> bool {
119
120 todo!();
121 /*
122 return stopRequested || (stopWhenEmpty && taskQueue.empty());
123 */
124 }
125
126 /**
127 | Services the queue 'forever'. Should
128 | be run in a thread.
129 |
130 */
131 pub fn service_queue(&mut self) {
132
133 todo!();
134 /*
135 SetSyscallSandboxPolicy(SyscallSandboxPolicy::SCHEDULER);
136 WAIT_LOCK(newTaskMutex, lock);
137 ++nThreadsServicingQueue;
138
139 // newTaskMutex is locked throughout this loop EXCEPT
140 // when the thread is waiting or when the user's function
141 // is called.
142 while (!shouldStop()) {
143 try {
144 while (!shouldStop() && taskQueue.empty()) {
145 // Wait until there is something to do.
146 newTaskScheduled.wait(lock);
147 }
148
149 // Wait until either there is a new task, or until
150 // the time of the first item on the queue:
151
152 while (!shouldStop() && !taskQueue.empty()) {
153 std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
154 if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
155 break; // Exit loop after timeout, it means we reached the time of the event
156 }
157 }
158
159 // If there are multiple threads, the queue can empty while we're waiting (another
160 // thread may service the task we were waiting on).
161 if (shouldStop() || taskQueue.empty())
162 continue;
163
164 SchedulerFunction f = taskQueue.begin()->second;
165 taskQueue.erase(taskQueue.begin());
166
167 {
168 // Unlock before calling f, so it can reschedule itself or another task
169 // without deadlocking:
170 REVERSE_LOCK(lock);
171 f();
172 }
173 } catch (...) {
174 --nThreadsServicingQueue;
175 throw;
176 }
177 }
178 --nThreadsServicingQueue;
179 newTaskScheduled.notify_one();
180 */
181 }
182
183 /**
184 | Call func at/after time t
185 |
186 */
187 pub fn schedule(&mut self,
188 f: SchedulerFunction,
189 t: TimePoint) {
190
191 todo!();
192 /*
193 {
194 LOCK(newTaskMutex);
195 taskQueue.insert(std::make_pair(t, f));
196 }
197 newTaskScheduled.notify_one();
198 */
199 }
200
201 /**
202 | Mock the scheduler to fast forward in
203 | time.
204 |
205 | Iterates through items on taskQueue
206 | and reschedules them to be delta_seconds
207 | sooner.
208 |
209 */
210 pub fn mock_forward(&mut self, delta_seconds: Duration /* seconds */) {
211
212 todo!();
213 /*
214 assert(delta_seconds > 0s && delta_seconds <= 1h);
215
216 {
217 LOCK(newTaskMutex);
218
219 // use temp_queue to maintain updated schedule
220 std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
221
222 for (const auto& element : taskQueue) {
223 temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
224 }
225
226 // point taskQueue to temp_queue
227 taskQueue = std::move(temp_queue);
228 }
229
230 // notify that the taskQueue needs to be processed
231 newTaskScheduled.notify_one();
232 */
233 }
234
235 /**
236 | Repeat f until the scheduler is stopped.
237 | First run is after delta has passed once.
238 |
239 | The timing is not exact: Every time f
240 | is finished, it is rescheduled to run
241 | again after delta. If you need more accurate
242 | scheduling, don't use this method.
243 |
244 */
245 pub fn schedule_every(&mut self,
246 f: SchedulerFunction,
247 delta: Duration /* millis */) {
248
249 todo!();
250 /*
251 scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
252 */
253 }
254
255 /**
256 | Returns number of tasks waiting to be
257 | serviced, and first and last task times
258 |
259 */
260 pub fn get_queue_info(&self,
261 first: &mut TimePoint,
262 last: &mut TimePoint) -> usize {
263
264 todo!();
265 /*
266 LOCK(newTaskMutex);
267 size_t result = taskQueue.size();
268 if (!taskQueue.empty()) {
269 first = taskQueue.begin()->first;
270 last = taskQueue.rbegin()->first;
271 }
272 return result;
273 */
274 }
275
276 /**
277 | Returns true if there are threads actively
278 | running in serviceQueue()
279 |
280 */
281 pub fn are_threads_servicing_queue(&self) -> bool {
282
283 todo!();
284 /*
285 LOCK(newTaskMutex);
286 return nThreadsServicingQueue;
287 */
288 }
289}
290
291/**
292 | Class used by CScheduler clients which
293 | may schedule multiple jobs which are
294 | required to be run serially.
295 |
296 | Jobs may not be run on the same thread,
297 | but no two jobs will be executed at the
298 | same time and memory will be release-acquire
299 | consistent (the scheduler will internally
300 | do an acquire before invoking a callback
301 | as well as a release at the end).
302 |
303 | In practice this means that a callback
304 |
305 | B() will be able to observe all of the
306 | effects of callback A() which executed
307 | before it.
308 |
309 */
310pub struct SingleThreadedSchedulerClient {
311 pscheduler: *mut Scheduler,
312 cs_callbacks_pending: parking_lot::ReentrantMutex<single_threaded_scheduler_client::Inner>,
313}
314
315pub mod single_threaded_scheduler_client {
316
317 use super::*;
318
319 pub struct Inner {
320 callbacks_pending: LinkedList<fn() -> ()>,
321 are_callbacks_running: bool, // default = false
322 }
323}
324
325impl SingleThreadedSchedulerClient {
326
327 pub fn new(pscheduler_in: *mut Scheduler) -> Self {
328
329 todo!();
330 /*
331 : pscheduler(pschedulerIn),
332 */
333 }
334
335 pub fn maybe_schedule_process_queue(&mut self) {
336
337 todo!();
338 /*
339 {
340 LOCK(m_cs_callbacks_pending);
341 // Try to avoid scheduling too many copies here, but if we
342 // accidentally have two ProcessQueue's scheduled at once its
343 // not a big deal.
344 if (m_are_callbacks_running) return;
345 if (m_callbacks_pending.empty()) return;
346 }
347 m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
348 */
349 }
350
351 pub fn process_queue(&mut self) {
352
353 todo!();
354 /*
355 std::function<c_void()> callback;
356 {
357 LOCK(m_cs_callbacks_pending);
358 if (m_are_callbacks_running) return;
359 if (m_callbacks_pending.empty()) return;
360 m_are_callbacks_running = true;
361
362 callback = std::move(m_callbacks_pending.front());
363 m_callbacks_pending.pop_front();
364 }
365
366 // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
367 // to ensure both happen safely even if callback() throws.
368 struct RAIICallbacksRunning {
369 SingleThreadedSchedulerClient* instance;
370 explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
371 ~RAIICallbacksRunning()
372 {
373 {
374 LOCK(instance->m_cs_callbacks_pending);
375 instance->m_are_callbacks_running = false;
376 }
377 instance->MaybeScheduleProcessQueue();
378 }
379 } raiicallbacksrunning(this);
380
381 callback();
382 */
383 }
384
385 /**
386 | Add a callback to be executed. Callbacks
387 | are executed serially and memory is
388 | release-acquire consistent between
389 | callback executions.
390 |
391 | Practically, this means that callbacks
392 | can behave as if they are executed in
393 | order by a single thread.
394 |
395 */
396 pub fn add_to_process_queue(&mut self, func: fn() -> ()) {
397
398 todo!();
399 /*
400 assert(m_pscheduler);
401
402 {
403 LOCK(m_cs_callbacks_pending);
404 m_callbacks_pending.emplace_back(std::move(func));
405 }
406 MaybeScheduleProcessQueue();
407 */
408 }
409
410 /**
411 | Processes all remaining queue members
412 | on the calling thread, blocking until
413 | queue is empty
414 |
415 | Must be called after the CScheduler
416 | has no remaining processing threads!
417 |
418 */
419 pub fn empty_queue(&mut self) {
420
421 todo!();
422 /*
423 assert(!m_pscheduler->AreThreadsServicingQueue());
424 bool should_continue = true;
425 while (should_continue) {
426 ProcessQueue();
427 LOCK(m_cs_callbacks_pending);
428 should_continue = !m_callbacks_pending.empty();
429 }
430 */
431 }
432
433 pub fn callbacks_pending(&mut self) -> usize {
434
435 todo!();
436 /*
437 LOCK(m_cs_callbacks_pending);
438 return m_callbacks_pending.size();
439 */
440 }
441}