worker_synchronizer/
utils.rs

1use std::collections::{BinaryHeap, HashSet, VecDeque};
2
3use crate::{GetUpdate, Unblocked};
4
5pub struct HashSetDedupOnUpdate<GU: GetUpdate> {
6    inner: GU,
7    history: HashSet<GU::U>,
8}
9
10impl<GU: GetUpdate> HashSetDedupOnUpdate<GU> {
11    pub fn new(gu: GU) -> Self {
12        Self {
13            inner: gu,
14            history: Default::default(),
15        }
16    }
17}
18
19impl<GU: GetUpdate> GetUpdate for HashSetDedupOnUpdate<GU>
20where
21    GU::U: Clone + std::hash::Hash + Eq,
22{
23    type G = GU::G;
24
25    type U = GU::U;
26
27    fn get(&mut self) -> Option<Self::G> {
28        self.inner.get()
29    }
30
31    fn update(&mut self, u: Self::U) -> Unblocked {
32        if self.history.insert(u.clone()) {
33            self.inner.update(u)
34        } else {
35            Unblocked::Zero
36        }
37    }
38
39    fn update_multiples(&mut self, us: impl IntoIterator<Item = Self::U>) -> Unblocked {
40        self.inner
41            .update_multiples(us.into_iter().filter(|u| self.history.insert(u.clone())))
42    }
43}
44
45pub struct HashSetDedupOnGet<GU: GetUpdate> {
46    inner: GU,
47    history: HashSet<GU::G>,
48}
49
50impl<GU: GetUpdate> GetUpdate for HashSetDedupOnGet<GU>
51where
52    GU::G: Clone + std::hash::Hash + Eq,
53{
54    type G = GU::G;
55
56    type U = GU::U;
57
58    fn get(&mut self) -> Option<Self::G> {
59        loop {
60            let candidate = self.inner.get()?;
61            if self.history.insert(candidate.clone()) {
62                return Some(candidate);
63            }
64        }
65    }
66
67    fn update(&mut self, u: Self::U) -> Unblocked {
68        self.inner.update(u)
69    }
70
71    fn update_multiples(&mut self, us: impl IntoIterator<Item = Self::U>) -> Unblocked {
72        self.inner.update_multiples(us)
73    }
74}
75
76impl<GU: GetUpdate> HashSetDedupOnGet<GU> {
77    pub fn new(gu: GU) -> Self {
78        Self {
79            inner: gu,
80            history: Default::default(),
81        }
82    }
83}
84
85impl<T> GetUpdate for Vec<T> {
86    type G = T;
87
88    type U = T;
89
90    fn get(&mut self) -> Option<Self::G> {
91        self.pop()
92    }
93
94    fn update(&mut self, u: Self::U) -> Unblocked {
95        self.push(u);
96        Unblocked::One
97    }
98
99    fn update_multiples(&mut self, us: impl IntoIterator<Item = Self::U>) -> Unblocked {
100        let it = us.into_iter();
101        let (_, ub) = it.size_hint();
102        self.extend(it);
103        Unblocked::from_size_hint_ub(ub)
104    }
105}
106
107impl<T> GetUpdate for VecDeque<T> {
108    type G = T;
109
110    type U = T;
111
112    fn get(&mut self) -> Option<Self::G> {
113        self.pop_front()
114    }
115
116    fn update(&mut self, u: Self::U) -> Unblocked {
117        self.push_back(u);
118        Unblocked::One
119    }
120
121    fn update_multiples(&mut self, us: impl IntoIterator<Item = Self::U>) -> Unblocked {
122        let it = us.into_iter();
123        let (_, ub) = it.size_hint();
124        self.extend(it);
125        Unblocked::from_size_hint_ub(ub)
126    }
127}
128
129struct Prioritized<Prio, T> {
130    prio: Prio,
131    elem: T,
132}
133
134impl<Prio, T> From<(Prio, T)> for Prioritized<Prio, T> {
135    fn from((prio, elem): (Prio, T)) -> Self {
136        Self { prio, elem }
137    }
138}
139
140impl<Prio, T> From<Prioritized<Prio, T>> for (Prio, T) {
141    fn from(Prioritized { prio, elem }: Prioritized<Prio, T>) -> Self {
142        (prio, elem)
143    }
144}
145
146impl<Prio: Ord, T> Ord for Prioritized<Prio, T> {
147    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
148        self.prio.cmp(&other.prio)
149    }
150}
151
152impl<Prio: PartialOrd, T> PartialOrd for Prioritized<Prio, T> {
153    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
154        self.prio.partial_cmp(&other.prio)
155    }
156}
157
158impl<Prio: Eq, T> Eq for Prioritized<Prio, T> {}
159
160impl<Prio: PartialEq, T> PartialEq for Prioritized<Prio, T> {
161    fn eq(&self, other: &Self) -> bool {
162        self.prio == other.prio
163    }
164}
165
166pub struct PrioQueue<Prio, T>(BinaryHeap<Prioritized<Prio, T>>);
167
168impl<Prio: Ord, T> GetUpdate for PrioQueue<Prio, T> {
169    type G = T;
170
171    type U = (Prio, T);
172
173    fn get(&mut self) -> Option<Self::G> {
174        let tmp: (Prio, Self::G) = self.0.pop()?.into();
175        Some(tmp.1)
176    }
177
178    fn update(&mut self, u: Self::U) -> Unblocked {
179        self.0.push(u.into());
180        Unblocked::One
181    }
182
183    fn update_multiples(&mut self, us: impl IntoIterator<Item = Self::U>) -> Unblocked {
184        let it = us.into_iter();
185        let (_, ub) = it.size_hint();
186        self.0.extend(it.map(Prioritized::from));
187        Unblocked::from_size_hint_ub(ub)
188    }
189}