differential_dataflow/dynamic/
pointstamp.rs1use columnar::Columnar;
15use serde::{Deserialize, Serialize};
16use smallvec::SmallVec;
17
18#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Columnar)]
23#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]
24pub struct PointStamp<T> {
25 vector: SmallVec<[T; 1]>,
27}
28
29impl<T: Timestamp> PartialEq<[T]> for PointStamp<T> {
30 fn eq(&self, other: &[T]) -> bool {
31 self.vector.iter()
32 .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
33 .all(|(t1, t2)| t1.eq(t2))
34 }
35}
36
37impl<T: Timestamp> PartialEq<PointStamp<T>> for [T] {
38 fn eq(&self, other: &PointStamp<T>) -> bool {
39 self.iter()
40 .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
41 .all(|(t1, t2)| t1.eq(t2))
42 }
43}
44
45impl<T: Timestamp> PartialOrder<[T]> for PointStamp<T> {
46 fn less_equal(&self, other: &[T]) -> bool {
47 self.vector.iter()
48 .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
49 .all(|(t1, t2)| t1.less_equal(t2))
50 }
51}
52
53impl<T: Timestamp> PartialOrder<PointStamp<T>> for [T] {
54 fn less_equal(&self, other: &PointStamp<T>) -> bool {
55 self.iter()
56 .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
57 .all(|(t1, t2)| t1.less_equal(t2))
58 }
59}
60
61impl<T: Timestamp> PointStamp<T> {
62 pub fn new(mut vector: SmallVec<[T; 1]>) -> Self {
66 while vector.last() == Some(&T::minimum()) {
67 vector.pop();
68 }
69 PointStamp { vector }
70 }
71 pub fn into_inner(self) -> SmallVec<[T; 1]> {
77 self.vector
78 }
79}
80
81impl<T> std::ops::Deref for PointStamp<T> {
82 type Target = [T];
83 fn deref(&self) -> &Self::Target {
84 &self.vector
85 }
86}
87
88use timely::order::PartialOrder;
90impl<T: PartialOrder + Timestamp> PartialOrder for PointStamp<T> {
91 fn less_equal(&self, other: &Self) -> bool {
92 self.vector
97 .iter()
98 .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
99 .all(|(t1, t2)| t1.less_equal(t2))
100 }
101}
102
103use timely::progress::timestamp::Refines;
104impl<T: Timestamp> Refines<()> for PointStamp<T> {
105 fn to_inner(_outer: ()) -> Self {
106 Self { vector: Default::default() }
107 }
108 fn to_outer(self) -> () {
109 ()
110 }
111 fn summarize(_summary: <Self>::Summary) -> () {
112 ()
113 }
114}
115
116use timely::progress::PathSummary;
119
120#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
122pub struct PointStampSummary<TS> {
123 pub retain: Option<usize>,
127 pub actions: Vec<TS>,
132}
133
134impl<T: Timestamp> PathSummary<PointStamp<T>> for PointStampSummary<T::Summary> {
135 fn results_in(&self, timestamp: &PointStamp<T>) -> Option<PointStamp<T>> {
136 let timestamps = if let Some(retain) = self.retain {
138 if retain < timestamp.vector.len() {
139 ×tamp.vector[..retain]
140 } else {
141 ×tamp.vector[..]
142 }
143 } else {
144 ×tamp.vector[..]
145 };
146
147 let mut vector = Vec::with_capacity(std::cmp::max(timestamps.len(), self.actions.len()));
148 let min_len = std::cmp::min(timestamps.len(), self.actions.len());
150 for (action, timestamp) in self.actions.iter().zip(timestamps.iter()) {
151 vector.push(action.results_in(timestamp)?);
152 }
153 for timestamp in timestamps.iter().skip(min_len) {
155 vector.push(timestamp.clone());
156 }
157 for action in self.actions.iter().skip(min_len) {
159 vector.push(action.results_in(&T::minimum())?);
160 }
161
162 Some(PointStamp::new(vector.into()))
163 }
164 fn followed_by(&self, other: &Self) -> Option<Self> {
165 let retain = match (self.retain, other.retain) {
167 (Some(x), Some(y)) => Some(std::cmp::min(x, y)),
168 (Some(x), None) => Some(x),
169 (None, Some(y)) => Some(y),
170 (None, None) => None,
171 };
172
173 let self_actions = if let Some(retain) = other.retain {
175 if retain < self.actions.len() {
176 &self.actions[..retain]
177 } else {
178 &self.actions[..]
179 }
180 } else {
181 &self.actions[..]
182 };
183
184 let mut actions = Vec::with_capacity(std::cmp::max(self_actions.len(), other.actions.len()));
185 let min_len = std::cmp::min(self_actions.len(), other.actions.len());
187 for (action1, action2) in self_actions.iter().zip(other.actions.iter()) {
188 actions.push(action1.followed_by(action2)?);
189 }
190 actions.extend(self_actions.iter().skip(min_len).cloned());
192 actions.extend(other.actions.iter().skip(min_len).cloned());
194
195 Some(Self { retain, actions })
196 }
197}
198
199impl<TS: PartialOrder> PartialOrder for PointStampSummary<TS> {
200 fn less_equal(&self, other: &Self) -> bool {
201 self.retain == other.retain
206 && self.actions.len() <= other.actions.len()
207 && self
208 .actions
209 .iter()
210 .zip(other.actions.iter())
211 .all(|(t1, t2)| t1.less_equal(t2))
212 }
213}
214
215use timely::progress::Timestamp;
217impl<T: Timestamp> Timestamp for PointStamp<T> {
218 fn minimum() -> Self {
219 Self::new(Default::default())
220 }
221 type Summary = PointStampSummary<T::Summary>;
222}
223
224use crate::lattice::Lattice;
227impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
228 fn join(&self, other: &Self) -> Self {
229 let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
230 let max_len = ::std::cmp::max(self.vector.len(), other.vector.len());
231 let mut vector = SmallVec::with_capacity(max_len);
232 for index in 0..min_len {
234 vector.push(self.vector[index].join(&other.vector[index]));
235 }
236 for time in &self.vector[min_len..] {
238 vector.push(time.clone());
239 }
240 for time in &other.vector[min_len..] {
241 vector.push(time.clone());
242 }
243 Self::new(vector)
244 }
245 fn meet(&self, other: &Self) -> Self {
246 let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
247 let mut vector = SmallVec::with_capacity(min_len);
248 for index in 0..min_len {
250 vector.push(self.vector[index].meet(&other.vector[index]));
251 }
252 Self::new(vector)
254 }
255}
256
257mod columnation {
258 use columnation::{Columnation, Region};
259 use smallvec::SmallVec;
260 use crate::dynamic::pointstamp::PointStamp;
261
262 impl<T: Columnation+Clone> Columnation for PointStamp<T> {
263 type InnerRegion = PointStampStack<T::InnerRegion>;
264 }
265
266 pub struct PointStampStack<R: Region<Item: Columnation+Clone>>(<SmallVec<[R::Item; 1]> as Columnation>::InnerRegion);
268
269 impl<R: Region<Item: Columnation+Clone>> Default for PointStampStack<R> {
270 #[inline]
271 fn default() -> Self {
272 Self(Default::default())
273 }
274 }
275
276 impl<R: Region<Item: Columnation+Clone>> Region for PointStampStack<R> {
277 type Item = PointStamp<R::Item>;
278
279 #[inline]
280 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
281 Self::Item { vector: self.0.copy(&item.vector) }
282 }
283
284 fn clear(&mut self) {
285 self.0.clear();
286 }
287
288 fn reserve_items<'a, I>(&mut self, items: I) where Self: 'a, I: Iterator<Item=&'a Self::Item> + Clone {
289 self.0.reserve_items(items.map(|x| &x.vector));
290 }
291
292 fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator<Item=&'a Self> + Clone {
293 self.0.reserve_regions(regions.map(|r| &r.0));
294 }
295
296 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
297 self.0.heap_size(callback);
298 }
299 }
300}