generic_bnp/misc/
blocking_queue.rs1#![allow(dead_code)]
18
19use binary_heap_plus::{BinaryHeap};
20use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
21use std::sync::*;
22use crate::branch::Branch;
23use crate::{IBranchFilter, UserDualStore};
24use compare::Compare;
25
26
27
28
29pub struct BlockingQueue<T, C, DualStore>
31where
32 T: IBranchFilter,
33 DualStore : UserDualStore,
34C : Compare<Branch<T,DualStore>> + Clone
35
36{
37 queue: Mutex<BinaryHeap<Branch<T,DualStore>, C>>,
38 did_swap_priority: AtomicBool,
39 flushing : AtomicBool,
40 in_progress: AtomicU8,
41 condvar: Condvar,
42 compare_bound : C,
43 compare_without_bound : C
44}
45
46impl<T: IBranchFilter, C : Compare<Branch<T,DualStore >> + Clone, DualStore : UserDualStore> BlockingQueue<T , C, DualStore>
47
48{
49 pub fn new(compare_bound : C, compare_without_bound : C) -> Self {
51
52
53 let heap = BinaryHeap::from_vec_cmp(vec![] as Vec<Branch<T, DualStore>>, compare_without_bound.clone());
54
55
56
57 Self {
58 queue: Mutex::new(heap),
59 in_progress: AtomicU8::new(0),
60 condvar: Condvar::new(),
61 did_swap_priority: AtomicBool::new(false),
62 compare_bound,
63 compare_without_bound,
64 flushing : false.into()
65
66 }
67 }
68
69 pub fn now_has_bound(&self) {
70
71 if !self.did_swap_priority.load(self::Ordering::Relaxed) {
73 self.did_swap_priority.store(true, self::Ordering::SeqCst);
74 let mut lq = self.queue.lock().unwrap();
75 lq.replace_cmp(self.compare_bound.clone());
76 }
77 }
78
79 pub fn add_job(&self, t: Branch<T, DualStore>) {
82
83 if self.flushing.load(Ordering::SeqCst) { return;}
84 let mut lq = self.queue.lock().unwrap();
85 lq.push(t);
86 self.condvar.notify_one();
87 }
88 pub fn get_job(&self) -> Option<Branch<T, DualStore>> {
92 let mut lq = self.queue.lock().unwrap();
93 while lq.len() == 0 && self.in_progress.load(Ordering::SeqCst) > 0 && !self.flushing.load(Ordering::SeqCst) {
95 lq = self.condvar.wait(lq).unwrap();
96 }
97
98 if self.flushing.load(Ordering::SeqCst) { return None}
99
100 let job = lq.pop();
101 if job.is_some() {
102 self.in_progress.fetch_add(1, Ordering::SeqCst);
103 }
104 job
105 }
106
107 pub fn job_done(&self) {
108 self.in_progress.fetch_sub(1, Ordering::SeqCst);
109 self.condvar.notify_all();
110 }
111
112 pub fn flush_and_terminate(&self) {
113
114 self.flushing.store(true, Ordering::SeqCst);
115 self.in_progress.store(0, Ordering::SeqCst);
116 self.condvar.notify_all();
117 }
118
119 pub fn copy_of_queue(&self) -> Vec<Branch<T, DualStore>> {
120 self.queue.lock().unwrap().clone().into_vec()
121 }
122
123
124 pub fn lowest_bound(&self) -> Option<f64> {
125 self.queue.lock().unwrap().iter().map(|i| i.old_obj_bound).min_by(|a, b| a.partial_cmp(b).unwrap())
126 }
127
128
129 pub fn len(&self) -> usize {
131 self.queue.lock().unwrap().len()
132 }
133}