generic_bnp/misc/
blocking_queue.rs

1/////////////////////////////////////////////////////////////
2// rust_blocking_queue::lib.rs - BlockingQueue             //
3//                                                         //
4// Jim Fawcett, https://JimFawcett.github.io, 19 May 2020  //
5/////////////////////////////////////////////////////////////
6/*
7   This is a BlockingQueue abstraction.  To be shared between
8   threads, without using unsafe code, any abstraction must
9   be composed only of Mutexes and Condvars or a struct or
10   tuple with only those members.
11   That means that the blocking queue must hold its native
12   queue in a Mutex, as shown below.
13
14   There is another alternative, based on Rust channels, which
15   are essentially blocking queues.
16*/
17#![allow(dead_code)]
18
19use binary_heap_plus::{BinaryHeap};
20use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
21use std::sync::*;
22use crate::branch::Branch;
23use crate::{IBranchFilter, UserDualStore};
24use compare::Compare;
25
26
27
28
29/// Thread-safe queue that blocks de_q on empty
30pub struct BlockingQueue<T, C, DualStore>
31where
32    T: IBranchFilter,
33    DualStore : UserDualStore,
34C : Compare<Branch<T,DualStore>>  + Clone
35
36{
37    queue: Mutex<BinaryHeap<Branch<T,DualStore>, C>>,
38    did_swap_priority: AtomicBool,
39    flushing : AtomicBool,
40    in_progress: AtomicU8,
41    condvar: Condvar,
42    compare_bound : C,
43    compare_without_bound : C
44}
45
46impl<T: IBranchFilter, C : Compare<Branch<T,DualStore >> + Clone, DualStore : UserDualStore> BlockingQueue<T , C, DualStore>
47
48{
49    /// Create empty blocking queue
50    pub fn new(compare_bound :  C, compare_without_bound : C) -> Self {
51
52
53        let heap  = BinaryHeap::from_vec_cmp(vec![] as Vec<Branch<T, DualStore>>, compare_without_bound.clone());
54
55
56
57        Self {
58            queue: Mutex::new(heap),
59            in_progress: AtomicU8::new(0),
60            condvar: Condvar::new(),
61            did_swap_priority: AtomicBool::new(false),
62            compare_bound,
63            compare_without_bound,
64            flushing : false.into()
65
66        }
67    }
68
69    pub fn now_has_bound(&self) {
70
71        // only swap if not previously
72        if !self.did_swap_priority.load(self::Ordering::Relaxed) {
73            self.did_swap_priority.store(true, self::Ordering::SeqCst);
74            let mut lq = self.queue.lock().unwrap();
75            lq.replace_cmp(self.compare_bound.clone());
76        }
77    }
78
79    /// push input on back of queue
80    /// - unrecoverable if lock fails so just unwrap
81    pub fn add_job(&self, t: Branch<T, DualStore>) {
82
83        if self.flushing.load(Ordering::SeqCst) { return;}
84        let mut lq = self.queue.lock().unwrap();
85        lq.push(t);
86        self.condvar.notify_one();
87    }
88    /// pop element from front of queue
89    /// - unrecoverable if lock fails so just unwrap
90    /// - same for condition variable
91    pub fn get_job(&self) -> Option<Branch<T, DualStore>> {
92        let mut lq = self.queue.lock().unwrap();
93        // if the queue is empty we wait, but only if there are no running jobs
94        while lq.len() == 0 && self.in_progress.load(Ordering::SeqCst) > 0 && !self.flushing.load(Ordering::SeqCst) {
95            lq = self.condvar.wait(lq).unwrap();
96        }
97
98        if self.flushing.load(Ordering::SeqCst) { return None}
99
100        let job = lq.pop();
101        if job.is_some() {
102            self.in_progress.fetch_add(1, Ordering::SeqCst);
103        }
104        job
105    }
106
107    pub fn job_done(&self) {
108        self.in_progress.fetch_sub(1, Ordering::SeqCst);
109        self.condvar.notify_all();
110    }
111
112    pub fn flush_and_terminate(&self) {
113
114        self.flushing.store(true, Ordering::SeqCst);
115        self.in_progress.store(0, Ordering::SeqCst);
116        self.condvar.notify_all();
117    }
118
119    pub fn copy_of_queue(&self) -> Vec<Branch<T, DualStore>> {
120        self.queue.lock().unwrap().clone().into_vec()
121    }
122
123
124    pub fn lowest_bound(&self) -> Option<f64> {
125        self.queue.lock().unwrap().iter().map(|i| i.old_obj_bound).min_by(|a, b| a.partial_cmp(b).unwrap())
126    }
127
128
129    /// return number of elements in queue
130    pub fn len(&self) -> usize {
131        self.queue.lock().unwrap().len()
132    }
133}