jwalk_meta/core/
ordered_queue.rs

1//! Ordered queue backed by a channel.
2
3use crossbeam::channel::{self, Receiver, SendError, Sender, TryRecvError};
4use std::collections::BinaryHeap;
5use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
6use std::thread;
7
8use super::*;
9
10pub(crate) struct OrderedQueue<T>
11where
12    T: Send,
13{
14    sender: Sender<Ordered<T>>,
15    pending_count: Arc<AtomicUsize>,
16    stop: Arc<AtomicBool>,
17}
18
19pub enum Ordering {
20    Relaxed,
21    Strict,
22}
23
24pub struct OrderedQueueIter<T>
25where
26    T: Send,
27{
28    ordering: Ordering,
29    stop: Arc<AtomicBool>,
30    receiver: Receiver<Ordered<T>>,
31    receive_buffer: BinaryHeap<Ordered<T>>,
32    pending_count: Arc<AtomicUsize>,
33    ordered_matcher: OrderedMatcher,
34}
35
36struct OrderedMatcher {
37    looking_for: IndexPath,
38    child_count_stack: Vec<usize>,
39}
40
41pub(crate) fn new_ordered_queue<T>(
42    stop: Arc<AtomicBool>,
43    ordering: Ordering,
44) -> (OrderedQueue<T>, OrderedQueueIter<T>)
45where
46    T: Send,
47{
48    let pending_count = Arc::new(AtomicUsize::new(0));
49    let (sender, receiver) = channel::unbounded();
50    (
51        OrderedQueue {
52            sender,
53            pending_count: pending_count.clone(),
54            stop: stop.clone(),
55        },
56        OrderedQueueIter {
57            ordering,
58            receiver,
59            ordered_matcher: OrderedMatcher::default(),
60            receive_buffer: BinaryHeap::new(),
61            pending_count,
62            stop,
63        },
64    )
65}
66
67impl<T> OrderedQueue<T>
68where
69    T: Send,
70{
71    pub fn push(&self, ordered: Ordered<T>) -> Result<(), SendError<Ordered<T>>> {
72        self.pending_count.fetch_add(1, AtomicOrdering::SeqCst);
73        self.sender.send(ordered)
74    }
75
76    pub fn complete_item(&self) {
77        self.pending_count.fetch_sub(1, AtomicOrdering::SeqCst);
78    }
79}
80
81impl<T> Clone for OrderedQueue<T>
82where
83    T: Send,
84{
85    fn clone(&self) -> Self {
86        OrderedQueue {
87            sender: self.sender.clone(),
88            pending_count: self.pending_count.clone(),
89            stop: self.stop.clone(),
90        }
91    }
92}
93
94impl<T> OrderedQueueIter<T>
95where
96    T: Send,
97{
98    fn pending_count(&self) -> usize {
99        self.pending_count.load(AtomicOrdering::SeqCst)
100    }
101
102    fn is_stop(&self) -> bool {
103        self.stop.load(AtomicOrdering::SeqCst)
104    }
105
106    fn try_next_relaxed(&mut self) -> Result<Ordered<T>, TryRecvError> {
107        if self.is_stop() {
108            return Err(TryRecvError::Disconnected);
109        }
110
111        while let Ok(ordered_work) = self.receiver.try_recv() {
112            self.receive_buffer.push(ordered_work)
113        }
114
115        if let Some(ordered_work) = self.receive_buffer.pop() {
116            Ok(ordered_work)
117        } else if self.pending_count() == 0 {
118            Err(TryRecvError::Disconnected)
119        } else {
120            Err(TryRecvError::Empty)
121        }
122    }
123
124    fn try_next_strict(&mut self) -> Result<Ordered<T>, TryRecvError> {
125        let looking_for = &self.ordered_matcher.looking_for;
126
127        loop {
128            if self.is_stop() {
129                return Err(TryRecvError::Disconnected);
130            }
131
132            let top_ordered = self.receive_buffer.peek();
133            if let Some(top_ordered) = top_ordered {
134                if top_ordered.index_path.eq(looking_for) {
135                    break;
136                }
137            }
138
139            if self.ordered_matcher.is_none() {
140                return Err(TryRecvError::Disconnected);
141            }
142
143            match self.receiver.try_recv() {
144                Ok(ordered) => {
145                    self.receive_buffer.push(ordered);
146                }
147                Err(err) => match err {
148                    TryRecvError::Empty => thread::yield_now(),
149                    TryRecvError::Disconnected => break,
150                },
151            }
152        }
153
154        let ordered = self.receive_buffer.pop().unwrap();
155        self.ordered_matcher.advance_past(&ordered);
156        Ok(ordered)
157    }
158}
159
160impl<T> Iterator for OrderedQueueIter<T>
161where
162    T: Send,
163{
164    type Item = Ordered<T>;
165    fn next(&mut self) -> Option<Ordered<T>> {
166        loop {
167            let try_next = match self.ordering {
168                Ordering::Relaxed => self.try_next_relaxed(),
169                Ordering::Strict => self.try_next_strict(),
170            };
171            match try_next {
172                Ok(next) => {
173                    return Some(next);
174                }
175                Err(err) => match err {
176                    TryRecvError::Empty => thread::yield_now(),
177                    TryRecvError::Disconnected => return None,
178                },
179            }
180        }
181    }
182}
183
184impl OrderedMatcher {
185    fn is_none(&self) -> bool {
186        self.looking_for.is_empty()
187    }
188
189    fn decrement_remaining_children(&mut self) {
190        *self.child_count_stack.last_mut().unwrap() -= 1;
191    }
192
193    fn advance_past<T>(&mut self, ordered: &Ordered<T>) {
194        self.decrement_remaining_children();
195
196        if ordered.child_count > 0 {
197            self.looking_for.push(0);
198            self.child_count_stack.push(ordered.child_count);
199        } else {
200            self.looking_for.increment_last();
201            while !self.child_count_stack.is_empty() && *self.child_count_stack.last().unwrap() == 0
202            {
203                self.looking_for.pop();
204                self.child_count_stack.pop();
205                if !self.looking_for.is_empty() {
206                    self.looking_for.increment_last();
207                }
208            }
209        }
210    }
211}
212
213impl Default for OrderedMatcher {
214    fn default() -> OrderedMatcher {
215        OrderedMatcher {
216            looking_for: IndexPath::new(vec![0]),
217            child_count_stack: vec![1],
218        }
219    }
220}