palimpsest_dataflow/dynamic/
pointstamp.rs1use columnar::Columnar;
15use serde::{Deserialize, Serialize};
16
17#[derive(
22 Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Columnar,
23)]
24#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]
25pub struct PointStamp<T> {
26 vector: Vec<T>,
28}
29
30impl<T: Timestamp> PartialEq<[T]> for PointStamp<T> {
31 fn eq(&self, other: &[T]) -> bool {
32 self.vector
33 .iter()
34 .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
35 .all(|(t1, t2)| t1.eq(t2))
36 }
37}
38
39impl<T: Timestamp> PartialEq<PointStamp<T>> for [T] {
40 fn eq(&self, other: &PointStamp<T>) -> bool {
41 self.iter()
42 .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
43 .all(|(t1, t2)| t1.eq(t2))
44 }
45}
46
47impl<T: Timestamp> PartialOrder<[T]> for PointStamp<T> {
48 fn less_equal(&self, other: &[T]) -> bool {
49 self.vector
50 .iter()
51 .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
52 .all(|(t1, t2)| t1.less_equal(t2))
53 }
54}
55
56impl<T: Timestamp> PartialOrder<PointStamp<T>> for [T] {
57 fn less_equal(&self, other: &PointStamp<T>) -> bool {
58 self.iter()
59 .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
60 .all(|(t1, t2)| t1.less_equal(t2))
61 }
62}
63
64impl<T: Timestamp> PointStamp<T> {
65 pub fn new(mut vector: Vec<T>) -> Self {
69 while vector.last() == Some(&T::minimum()) {
70 vector.pop();
71 }
72 PointStamp { vector }
73 }
74 pub fn into_vec(self) -> Vec<T> {
80 self.vector
81 }
82}
83
84impl<T> std::ops::Deref for PointStamp<T> {
85 type Target = [T];
86 fn deref(&self) -> &Self::Target {
87 &self.vector
88 }
89}
90
91use timely::order::PartialOrder;
93impl<T: PartialOrder + Timestamp> PartialOrder for PointStamp<T> {
94 fn less_equal(&self, other: &Self) -> bool {
95 self.vector
100 .iter()
101 .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
102 .all(|(t1, t2)| t1.less_equal(t2))
103 }
104}
105
106use timely::progress::timestamp::Refines;
107impl<T: Timestamp> Refines<()> for PointStamp<T> {
108 fn to_inner(_outer: ()) -> Self {
109 Self { vector: Vec::new() }
110 }
111 fn to_outer(self) -> () {
112 ()
113 }
114 fn summarize(_summary: <Self>::Summary) -> () {
115 ()
116 }
117}
118
119use timely::progress::PathSummary;
122
123#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
125pub struct PointStampSummary<TS> {
126 pub retain: Option<usize>,
130 pub actions: Vec<TS>,
135}
136
137impl<T: Timestamp> PathSummary<PointStamp<T>> for PointStampSummary<T::Summary> {
138 fn results_in(&self, timestamp: &PointStamp<T>) -> Option<PointStamp<T>> {
139 let timestamps = if let Some(retain) = self.retain {
141 if retain < timestamp.vector.len() {
142 ×tamp.vector[..retain]
143 } else {
144 ×tamp.vector[..]
145 }
146 } else {
147 ×tamp.vector[..]
148 };
149
150 let mut vector = Vec::with_capacity(std::cmp::max(timestamps.len(), self.actions.len()));
151 let min_len = std::cmp::min(timestamps.len(), self.actions.len());
153 for (action, timestamp) in self.actions.iter().zip(timestamps.iter()) {
154 vector.push(action.results_in(timestamp)?);
155 }
156 for timestamp in timestamps.iter().skip(min_len) {
158 vector.push(timestamp.clone());
159 }
160 for action in self.actions.iter().skip(min_len) {
162 vector.push(action.results_in(&T::minimum())?);
163 }
164
165 Some(PointStamp::new(vector))
166 }
167 fn followed_by(&self, other: &Self) -> Option<Self> {
168 let retain = match (self.retain, other.retain) {
170 (Some(x), Some(y)) => Some(std::cmp::min(x, y)),
171 (Some(x), None) => Some(x),
172 (None, Some(y)) => Some(y),
173 (None, None) => None,
174 };
175
176 let self_actions = if let Some(retain) = other.retain {
178 if retain < self.actions.len() {
179 &self.actions[..retain]
180 } else {
181 &self.actions[..]
182 }
183 } else {
184 &self.actions[..]
185 };
186
187 let mut actions =
188 Vec::with_capacity(std::cmp::max(self_actions.len(), other.actions.len()));
189 let min_len = std::cmp::min(self_actions.len(), other.actions.len());
191 for (action1, action2) in self_actions.iter().zip(other.actions.iter()) {
192 actions.push(action1.followed_by(action2)?);
193 }
194 actions.extend(self_actions.iter().skip(min_len).cloned());
196 actions.extend(other.actions.iter().skip(min_len).cloned());
198
199 Some(Self { retain, actions })
200 }
201}
202
203impl<TS: PartialOrder> PartialOrder for PointStampSummary<TS> {
204 fn less_equal(&self, other: &Self) -> bool {
205 self.retain == other.retain
210 && self.actions.len() <= other.actions.len()
211 && self
212 .actions
213 .iter()
214 .zip(other.actions.iter())
215 .all(|(t1, t2)| t1.less_equal(t2))
216 }
217}
218
219use timely::progress::Timestamp;
221impl<T: Timestamp> Timestamp for PointStamp<T> {
222 fn minimum() -> Self {
223 Self::new(Vec::new())
224 }
225 type Summary = PointStampSummary<T::Summary>;
226}
227
228use crate::lattice::Lattice;
231impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
232 fn join(&self, other: &Self) -> Self {
233 let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
234 let max_len = ::std::cmp::max(self.vector.len(), other.vector.len());
235 let mut vector = Vec::with_capacity(max_len);
236 for index in 0..min_len {
238 vector.push(self.vector[index].join(&other.vector[index]));
239 }
240 for time in &self.vector[min_len..] {
242 vector.push(time.clone());
243 }
244 for time in &other.vector[min_len..] {
245 vector.push(time.clone());
246 }
247 Self::new(vector)
248 }
249 fn meet(&self, other: &Self) -> Self {
250 let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
251 let mut vector = Vec::with_capacity(min_len);
252 for index in 0..min_len {
254 vector.push(self.vector[index].meet(&other.vector[index]));
255 }
256 Self::new(vector)
258 }
259}
260
261mod columnation {
262 use columnation::{Columnation, Region};
263
264 use crate::dynamic::pointstamp::PointStamp;
265
266 impl<T: Columnation> Columnation for PointStamp<T> {
267 type InnerRegion = PointStampStack<T::InnerRegion>;
268 }
269
270 pub struct PointStampStack<R: Region<Item: Columnation>>(
272 <Vec<R::Item> as Columnation>::InnerRegion,
273 );
274
275 impl<R: Region<Item: Columnation>> Default for PointStampStack<R> {
276 #[inline]
277 fn default() -> Self {
278 Self(Default::default())
279 }
280 }
281
282 impl<R: Region<Item: Columnation>> Region for PointStampStack<R> {
283 type Item = PointStamp<R::Item>;
284
285 #[inline]
286 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
287 Self::Item {
288 vector: self.0.copy(&item.vector),
289 }
290 }
291
292 fn clear(&mut self) {
293 self.0.clear();
294 }
295
296 fn reserve_items<'a, I>(&mut self, items: I)
297 where
298 Self: 'a,
299 I: Iterator<Item = &'a Self::Item> + Clone,
300 {
301 self.0.reserve_items(items.map(|x| &x.vector));
302 }
303
304 fn reserve_regions<'a, I>(&mut self, regions: I)
305 where
306 Self: 'a,
307 I: Iterator<Item = &'a Self> + Clone,
308 {
309 self.0.reserve_regions(regions.map(|r| &r.0));
310 }
311
312 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
313 self.0.heap_size(callback);
314 }
315 }
316}