worker_synchronizer/
utils.rs1use std::collections::{BinaryHeap, HashSet, VecDeque};
2
3use crate::{GetUpdate, Unblocked};
4
5pub struct HashSetDedupOnUpdate<GU: GetUpdate> {
6 inner: GU,
7 history: HashSet<GU::U>,
8}
9
10impl<GU: GetUpdate> HashSetDedupOnUpdate<GU> {
11 pub fn new(gu: GU) -> Self {
12 Self {
13 inner: gu,
14 history: Default::default(),
15 }
16 }
17}
18
19impl<GU: GetUpdate> GetUpdate for HashSetDedupOnUpdate<GU>
20where
21 GU::U: Clone + std::hash::Hash + Eq,
22{
23 type G = GU::G;
24
25 type U = GU::U;
26
27 fn get(&mut self) -> Option<Self::G> {
28 self.inner.get()
29 }
30
31 fn update(&mut self, u: Self::U) -> Unblocked {
32 if self.history.insert(u.clone()) {
33 self.inner.update(u)
34 } else {
35 Unblocked::Zero
36 }
37 }
38
39 fn update_multiples(&mut self, us: impl IntoIterator<Item = Self::U>) -> Unblocked {
40 self.inner
41 .update_multiples(us.into_iter().filter(|u| self.history.insert(u.clone())))
42 }
43}
44
45pub struct HashSetDedupOnGet<GU: GetUpdate> {
46 inner: GU,
47 history: HashSet<GU::G>,
48}
49
50impl<GU: GetUpdate> GetUpdate for HashSetDedupOnGet<GU>
51where
52 GU::G: Clone + std::hash::Hash + Eq,
53{
54 type G = GU::G;
55
56 type U = GU::U;
57
58 fn get(&mut self) -> Option<Self::G> {
59 loop {
60 let candidate = self.inner.get()?;
61 if self.history.insert(candidate.clone()) {
62 return Some(candidate);
63 }
64 }
65 }
66
67 fn update(&mut self, u: Self::U) -> Unblocked {
68 self.inner.update(u)
69 }
70
71 fn update_multiples(&mut self, us: impl IntoIterator<Item = Self::U>) -> Unblocked {
72 self.inner.update_multiples(us)
73 }
74}
75
76impl<GU: GetUpdate> HashSetDedupOnGet<GU> {
77 pub fn new(gu: GU) -> Self {
78 Self {
79 inner: gu,
80 history: Default::default(),
81 }
82 }
83}
84
85impl<T> GetUpdate for Vec<T> {
86 type G = T;
87
88 type U = T;
89
90 fn get(&mut self) -> Option<Self::G> {
91 self.pop()
92 }
93
94 fn update(&mut self, u: Self::U) -> Unblocked {
95 self.push(u);
96 Unblocked::One
97 }
98
99 fn update_multiples(&mut self, us: impl IntoIterator<Item = Self::U>) -> Unblocked {
100 let it = us.into_iter();
101 let (_, ub) = it.size_hint();
102 self.extend(it);
103 Unblocked::from_size_hint_ub(ub)
104 }
105}
106
107impl<T> GetUpdate for VecDeque<T> {
108 type G = T;
109
110 type U = T;
111
112 fn get(&mut self) -> Option<Self::G> {
113 self.pop_front()
114 }
115
116 fn update(&mut self, u: Self::U) -> Unblocked {
117 self.push_back(u);
118 Unblocked::One
119 }
120
121 fn update_multiples(&mut self, us: impl IntoIterator<Item = Self::U>) -> Unblocked {
122 let it = us.into_iter();
123 let (_, ub) = it.size_hint();
124 self.extend(it);
125 Unblocked::from_size_hint_ub(ub)
126 }
127}
128
129struct Prioritized<Prio, T> {
130 prio: Prio,
131 elem: T,
132}
133
134impl<Prio, T> From<(Prio, T)> for Prioritized<Prio, T> {
135 fn from((prio, elem): (Prio, T)) -> Self {
136 Self { prio, elem }
137 }
138}
139
140impl<Prio, T> From<Prioritized<Prio, T>> for (Prio, T) {
141 fn from(Prioritized { prio, elem }: Prioritized<Prio, T>) -> Self {
142 (prio, elem)
143 }
144}
145
146impl<Prio: Ord, T> Ord for Prioritized<Prio, T> {
147 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
148 self.prio.cmp(&other.prio)
149 }
150}
151
152impl<Prio: PartialOrd, T> PartialOrd for Prioritized<Prio, T> {
153 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
154 self.prio.partial_cmp(&other.prio)
155 }
156}
157
158impl<Prio: Eq, T> Eq for Prioritized<Prio, T> {}
159
160impl<Prio: PartialEq, T> PartialEq for Prioritized<Prio, T> {
161 fn eq(&self, other: &Self) -> bool {
162 self.prio == other.prio
163 }
164}
165
166pub struct PrioQueue<Prio, T>(BinaryHeap<Prioritized<Prio, T>>);
167
168impl<Prio: Ord, T> GetUpdate for PrioQueue<Prio, T> {
169 type G = T;
170
171 type U = (Prio, T);
172
173 fn get(&mut self) -> Option<Self::G> {
174 let tmp: (Prio, Self::G) = self.0.pop()?.into();
175 Some(tmp.1)
176 }
177
178 fn update(&mut self, u: Self::U) -> Unblocked {
179 self.0.push(u.into());
180 Unblocked::One
181 }
182
183 fn update_multiples(&mut self, us: impl IntoIterator<Item = Self::U>) -> Unblocked {
184 let it = us.into_iter();
185 let (_, ub) = it.size_hint();
186 self.0.extend(it.map(Prioritized::from));
187 Unblocked::from_size_hint_ub(ub)
188 }
189}