jwalk_meta/core/
ordered_queue.rs1use crossbeam::channel::{self, Receiver, SendError, Sender, TryRecvError};
4use std::collections::BinaryHeap;
5use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
6use std::thread;
7
8use super::*;
9
10pub(crate) struct OrderedQueue<T>
11where
12 T: Send,
13{
14 sender: Sender<Ordered<T>>,
15 pending_count: Arc<AtomicUsize>,
16 stop: Arc<AtomicBool>,
17}
18
19pub enum Ordering {
20 Relaxed,
21 Strict,
22}
23
24pub struct OrderedQueueIter<T>
25where
26 T: Send,
27{
28 ordering: Ordering,
29 stop: Arc<AtomicBool>,
30 receiver: Receiver<Ordered<T>>,
31 receive_buffer: BinaryHeap<Ordered<T>>,
32 pending_count: Arc<AtomicUsize>,
33 ordered_matcher: OrderedMatcher,
34}
35
36struct OrderedMatcher {
37 looking_for: IndexPath,
38 child_count_stack: Vec<usize>,
39}
40
41pub(crate) fn new_ordered_queue<T>(
42 stop: Arc<AtomicBool>,
43 ordering: Ordering,
44) -> (OrderedQueue<T>, OrderedQueueIter<T>)
45where
46 T: Send,
47{
48 let pending_count = Arc::new(AtomicUsize::new(0));
49 let (sender, receiver) = channel::unbounded();
50 (
51 OrderedQueue {
52 sender,
53 pending_count: pending_count.clone(),
54 stop: stop.clone(),
55 },
56 OrderedQueueIter {
57 ordering,
58 receiver,
59 ordered_matcher: OrderedMatcher::default(),
60 receive_buffer: BinaryHeap::new(),
61 pending_count,
62 stop,
63 },
64 )
65}
66
67impl<T> OrderedQueue<T>
68where
69 T: Send,
70{
71 pub fn push(&self, ordered: Ordered<T>) -> Result<(), SendError<Ordered<T>>> {
72 self.pending_count.fetch_add(1, AtomicOrdering::SeqCst);
73 self.sender.send(ordered)
74 }
75
76 pub fn complete_item(&self) {
77 self.pending_count.fetch_sub(1, AtomicOrdering::SeqCst);
78 }
79}
80
81impl<T> Clone for OrderedQueue<T>
82where
83 T: Send,
84{
85 fn clone(&self) -> Self {
86 OrderedQueue {
87 sender: self.sender.clone(),
88 pending_count: self.pending_count.clone(),
89 stop: self.stop.clone(),
90 }
91 }
92}
93
94impl<T> OrderedQueueIter<T>
95where
96 T: Send,
97{
98 fn pending_count(&self) -> usize {
99 self.pending_count.load(AtomicOrdering::SeqCst)
100 }
101
102 fn is_stop(&self) -> bool {
103 self.stop.load(AtomicOrdering::SeqCst)
104 }
105
106 fn try_next_relaxed(&mut self) -> Result<Ordered<T>, TryRecvError> {
107 if self.is_stop() {
108 return Err(TryRecvError::Disconnected);
109 }
110
111 while let Ok(ordered_work) = self.receiver.try_recv() {
112 self.receive_buffer.push(ordered_work)
113 }
114
115 if let Some(ordered_work) = self.receive_buffer.pop() {
116 Ok(ordered_work)
117 } else if self.pending_count() == 0 {
118 Err(TryRecvError::Disconnected)
119 } else {
120 Err(TryRecvError::Empty)
121 }
122 }
123
124 fn try_next_strict(&mut self) -> Result<Ordered<T>, TryRecvError> {
125 let looking_for = &self.ordered_matcher.looking_for;
126
127 loop {
128 if self.is_stop() {
129 return Err(TryRecvError::Disconnected);
130 }
131
132 let top_ordered = self.receive_buffer.peek();
133 if let Some(top_ordered) = top_ordered {
134 if top_ordered.index_path.eq(looking_for) {
135 break;
136 }
137 }
138
139 if self.ordered_matcher.is_none() {
140 return Err(TryRecvError::Disconnected);
141 }
142
143 match self.receiver.try_recv() {
144 Ok(ordered) => {
145 self.receive_buffer.push(ordered);
146 }
147 Err(err) => match err {
148 TryRecvError::Empty => thread::yield_now(),
149 TryRecvError::Disconnected => break,
150 },
151 }
152 }
153
154 let ordered = self.receive_buffer.pop().unwrap();
155 self.ordered_matcher.advance_past(&ordered);
156 Ok(ordered)
157 }
158}
159
160impl<T> Iterator for OrderedQueueIter<T>
161where
162 T: Send,
163{
164 type Item = Ordered<T>;
165 fn next(&mut self) -> Option<Ordered<T>> {
166 loop {
167 let try_next = match self.ordering {
168 Ordering::Relaxed => self.try_next_relaxed(),
169 Ordering::Strict => self.try_next_strict(),
170 };
171 match try_next {
172 Ok(next) => {
173 return Some(next);
174 }
175 Err(err) => match err {
176 TryRecvError::Empty => thread::yield_now(),
177 TryRecvError::Disconnected => return None,
178 },
179 }
180 }
181 }
182}
183
184impl OrderedMatcher {
185 fn is_none(&self) -> bool {
186 self.looking_for.is_empty()
187 }
188
189 fn decrement_remaining_children(&mut self) {
190 *self.child_count_stack.last_mut().unwrap() -= 1;
191 }
192
193 fn advance_past<T>(&mut self, ordered: &Ordered<T>) {
194 self.decrement_remaining_children();
195
196 if ordered.child_count > 0 {
197 self.looking_for.push(0);
198 self.child_count_stack.push(ordered.child_count);
199 } else {
200 self.looking_for.increment_last();
201 while !self.child_count_stack.is_empty() && *self.child_count_stack.last().unwrap() == 0
202 {
203 self.looking_for.pop();
204 self.child_count_stack.pop();
205 if !self.looking_for.is_empty() {
206 self.looking_for.increment_last();
207 }
208 }
209 }
210 }
211}
212
213impl Default for OrderedMatcher {
214 fn default() -> OrderedMatcher {
215 OrderedMatcher {
216 looking_for: IndexPath::new(vec![0]),
217 child_count_stack: vec![1],
218 }
219 }
220}