bitcoin_scheduler/
scheduler.rs

1crate::ix!();
2
3pub fn repeat(
4        s:     &mut Scheduler,
5        f:     SchedulerFunction,
6        delta: Duration /* millis */)  {
7    
8    todo!();
9        /*
10            f();
11        s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
12        */
13}
14
15//-------------------------------------------[.cpp/bitcoin/src/scheduler.h]
16//-------------------------------------------[.cpp/bitcoin/src/scheduler.cpp]
17
18/**
19  | Simple class for background tasks that
20  | should be run periodically or once "after
21  | a while"
22  | 
23  | Usage:
24  | 
25  | -----------
26  | @code
27  | 
28  | CScheduler* s = new CScheduler();
29  | s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: c_void doSomething() { }
30  | s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3});
31  | std::thread* t = new std::thread([&] { s->serviceQueue(); });
32  |  
33  | ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
34  | s->stop();
35  | t->join();
36  | delete t;
37  | delete s; // Must be done after thread is interrupted/joined.
38  |
39  */
40pub struct Scheduler {
41    service_thread:     Thread,
42    new_task_mutex:     RefCell<Mutex<SchedulerInner>>,
43    new_task_scheduled: Condvar,
44}
45
46pub struct SchedulerInner {
47    task_queue:                MultiMap<TimePoint,SchedulerFunction>,
48    n_threads_servicing_queue: i32, // default = { 0 }
49    stop_requested:            bool, // default = { false }
50    stop_when_empty:           bool, // default = { false }
51}
52
53pub type SchedulerFunction = Box<dyn FnMut() -> ()>;
54
55impl Drop for Scheduler {
56    fn drop(&mut self) {
57        todo!();
58        /*
59            assert(nThreadsServicingQueue == 0);
60        if (stopWhenEmpty) assert(taskQueue.empty());
61        */
62    }
63}
64
65impl Scheduler {
66
67    /**
68      | Call f once after the delta has passed
69      |
70      */
71    pub fn schedule_from_now(&mut self, 
72        f:     SchedulerFunction,
73        delta: Duration /* millis */)  {
74        
75        todo!();
76        /*
77            schedule(std::move(f), std::chrono::system_clock::now() + delta);
78        */
79    }
80
81    /**
82      | Tell any threads running serviceQueue
83      | to stop as soon as the current task is
84      | done
85      |
86      */
87    pub fn stop(&mut self)  {
88        
89        todo!();
90        /*
91            
92        [&]() { LOCK(newTaskMutex);  stopRequested = true }()
93        ;
94            newTaskScheduled.notify_all();
95            if (m_service_thread.joinable()) m_service_thread.join();
96        */
97    }
98
99    /**
100      | Tell any threads running serviceQueue
101      | to stop when there is no work left to be
102      | done
103      |
104      */
105    pub fn stop_when_drained(&mut self)  {
106        
107        todo!();
108        /*
109            
110        [&]() { LOCK(newTaskMutex);  stopWhenEmpty = true }()
111        ;
112            newTaskScheduled.notify_all();
113            if (m_service_thread.joinable()) m_service_thread.join();
114        */
115    }
116
117    #[EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)]
118    pub fn should_stop(&self) -> bool {
119        
120        todo!();
121        /*
122            return stopRequested || (stopWhenEmpty && taskQueue.empty());
123        */
124    }
125    
126    /**
127      | Services the queue 'forever'. Should
128      | be run in a thread.
129      |
130      */
131    pub fn service_queue(&mut self)  {
132        
133        todo!();
134        /*
135            SetSyscallSandboxPolicy(SyscallSandboxPolicy::SCHEDULER);
136        WAIT_LOCK(newTaskMutex, lock);
137        ++nThreadsServicingQueue;
138
139        // newTaskMutex is locked throughout this loop EXCEPT
140        // when the thread is waiting or when the user's function
141        // is called.
142        while (!shouldStop()) {
143            try {
144                while (!shouldStop() && taskQueue.empty()) {
145                    // Wait until there is something to do.
146                    newTaskScheduled.wait(lock);
147                }
148
149                // Wait until either there is a new task, or until
150                // the time of the first item on the queue:
151
152                while (!shouldStop() && !taskQueue.empty()) {
153                    std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
154                    if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
155                        break; // Exit loop after timeout, it means we reached the time of the event
156                    }
157                }
158
159                // If there are multiple threads, the queue can empty while we're waiting (another
160                // thread may service the task we were waiting on).
161                if (shouldStop() || taskQueue.empty())
162                    continue;
163
164                SchedulerFunction f = taskQueue.begin()->second;
165                taskQueue.erase(taskQueue.begin());
166
167                {
168                    // Unlock before calling f, so it can reschedule itself or another task
169                    // without deadlocking:
170                    REVERSE_LOCK(lock);
171                    f();
172                }
173            } catch (...) {
174                --nThreadsServicingQueue;
175                throw;
176            }
177        }
178        --nThreadsServicingQueue;
179        newTaskScheduled.notify_one();
180        */
181    }
182    
183    /**
184      | Call func at/after time t
185      |
186      */
187    pub fn schedule(&mut self, 
188        f: SchedulerFunction,
189        t: TimePoint)  {
190        
191        todo!();
192        /*
193            {
194            LOCK(newTaskMutex);
195            taskQueue.insert(std::make_pair(t, f));
196        }
197        newTaskScheduled.notify_one();
198        */
199    }
200    
201    /**
202      | Mock the scheduler to fast forward in
203      | time.
204      | 
205      | Iterates through items on taskQueue
206      | and reschedules them to be delta_seconds
207      | sooner.
208      |
209      */
210    pub fn mock_forward(&mut self, delta_seconds: Duration /* seconds */)  {
211        
212        todo!();
213        /*
214            assert(delta_seconds > 0s && delta_seconds <= 1h);
215
216        {
217            LOCK(newTaskMutex);
218
219            // use temp_queue to maintain updated schedule
220            std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
221
222            for (const auto& element : taskQueue) {
223                temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
224            }
225
226            // point taskQueue to temp_queue
227            taskQueue = std::move(temp_queue);
228        }
229
230        // notify that the taskQueue needs to be processed
231        newTaskScheduled.notify_one();
232        */
233    }
234    
235    /**
236      | Repeat f until the scheduler is stopped.
237      | First run is after delta has passed once.
238      | 
239      | The timing is not exact: Every time f
240      | is finished, it is rescheduled to run
241      | again after delta. If you need more accurate
242      | scheduling, don't use this method.
243      |
244      */
245    pub fn schedule_every(&mut self, 
246        f:     SchedulerFunction,
247        delta: Duration /* millis */)  {
248        
249        todo!();
250        /*
251            scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
252        */
253    }
254    
255    /**
256      | Returns number of tasks waiting to be
257      | serviced, and first and last task times
258      |
259      */
260    pub fn get_queue_info(&self, 
261        first: &mut TimePoint,
262        last:  &mut TimePoint) -> usize {
263        
264        todo!();
265        /*
266            LOCK(newTaskMutex);
267        size_t result = taskQueue.size();
268        if (!taskQueue.empty()) {
269            first = taskQueue.begin()->first;
270            last = taskQueue.rbegin()->first;
271        }
272        return result;
273        */
274    }
275    
276    /**
277      | Returns true if there are threads actively
278      | running in serviceQueue()
279      |
280      */
281    pub fn are_threads_servicing_queue(&self) -> bool {
282        
283        todo!();
284        /*
285            LOCK(newTaskMutex);
286        return nThreadsServicingQueue;
287        */
288    }
289}
290
291/**
292  | Class used by CScheduler clients which
293  | may schedule multiple jobs which are
294  | required to be run serially.
295  | 
296  | Jobs may not be run on the same thread,
297  | but no two jobs will be executed at the
298  | same time and memory will be release-acquire
299  | consistent (the scheduler will internally
300  | do an acquire before invoking a callback
301  | as well as a release at the end).
302  | 
303  | In practice this means that a callback
304  | 
305  | B() will be able to observe all of the
306  | effects of callback A() which executed
307  | before it.
308  |
309  */
310pub struct SingleThreadedSchedulerClient {
311    pscheduler:            *mut Scheduler,
312    cs_callbacks_pending:  parking_lot::ReentrantMutex<single_threaded_scheduler_client::Inner>,
313}
314
315pub mod single_threaded_scheduler_client {
316
317    use super::*;
318
319    pub struct Inner {
320        callbacks_pending:     LinkedList<fn() -> ()>,
321        are_callbacks_running: bool, // default = false
322    }
323}
324
325impl SingleThreadedSchedulerClient {
326    
327    pub fn new(pscheduler_in: *mut Scheduler) -> Self {
328    
329        todo!();
330        /*
331        : pscheduler(pschedulerIn),
332        */
333    }
334
335    pub fn maybe_schedule_process_queue(&mut self)  {
336        
337        todo!();
338        /*
339            {
340            LOCK(m_cs_callbacks_pending);
341            // Try to avoid scheduling too many copies here, but if we
342            // accidentally have two ProcessQueue's scheduled at once its
343            // not a big deal.
344            if (m_are_callbacks_running) return;
345            if (m_callbacks_pending.empty()) return;
346        }
347        m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
348        */
349    }
350    
351    pub fn process_queue(&mut self)  {
352        
353        todo!();
354        /*
355            std::function<c_void()> callback;
356        {
357            LOCK(m_cs_callbacks_pending);
358            if (m_are_callbacks_running) return;
359            if (m_callbacks_pending.empty()) return;
360            m_are_callbacks_running = true;
361
362            callback = std::move(m_callbacks_pending.front());
363            m_callbacks_pending.pop_front();
364        }
365
366        // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
367        // to ensure both happen safely even if callback() throws.
368        struct RAIICallbacksRunning {
369            SingleThreadedSchedulerClient* instance;
370            explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
371            ~RAIICallbacksRunning()
372            {
373                {
374                    LOCK(instance->m_cs_callbacks_pending);
375                    instance->m_are_callbacks_running = false;
376                }
377                instance->MaybeScheduleProcessQueue();
378            }
379        } raiicallbacksrunning(this);
380
381        callback();
382        */
383    }
384    
385    /**
386      | Add a callback to be executed. Callbacks
387      | are executed serially and memory is
388      | release-acquire consistent between
389      | callback executions.
390      | 
391      | Practically, this means that callbacks
392      | can behave as if they are executed in
393      | order by a single thread.
394      |
395      */
396    pub fn add_to_process_queue(&mut self, func: fn() -> ())  {
397        
398        todo!();
399        /*
400            assert(m_pscheduler);
401
402        {
403            LOCK(m_cs_callbacks_pending);
404            m_callbacks_pending.emplace_back(std::move(func));
405        }
406        MaybeScheduleProcessQueue();
407        */
408    }
409    
410    /**
411      | Processes all remaining queue members
412      | on the calling thread, blocking until
413      | queue is empty
414      | 
415      | Must be called after the CScheduler
416      | has no remaining processing threads!
417      |
418      */
419    pub fn empty_queue(&mut self)  {
420        
421        todo!();
422        /*
423            assert(!m_pscheduler->AreThreadsServicingQueue());
424        bool should_continue = true;
425        while (should_continue) {
426            ProcessQueue();
427            LOCK(m_cs_callbacks_pending);
428            should_continue = !m_callbacks_pending.empty();
429        }
430        */
431    }
432    
433    pub fn callbacks_pending(&mut self) -> usize {
434        
435        todo!();
436        /*
437            LOCK(m_cs_callbacks_pending);
438        return m_callbacks_pending.size();
439        */
440    }
441}