differential_dataflow/dynamic/
pointstamp.rs1use abomonation_derive::Abomonation;
15use serde::{Deserialize, Serialize};
16
17#[derive(
22 Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation,
23)]
24pub struct PointStamp<T> {
25 pub vector: Vec<T>,
27}
28
29impl<T> PointStamp<T> {
30 pub fn new(vector: Vec<T>) -> Self {
32 PointStamp { vector }
33 }
34}
35
36use timely::order::PartialOrder;
38impl<T: PartialOrder + Timestamp> PartialOrder for PointStamp<T> {
39 fn less_equal(&self, other: &Self) -> bool {
40 self.vector
45 .iter()
46 .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
47 .all(|(t1, t2)| t1.less_equal(t2))
48 }
49}
50
51use timely::progress::timestamp::Refines;
52impl<T: Timestamp> Refines<()> for PointStamp<T> {
53 fn to_inner(_outer: ()) -> Self {
54 Self { vector: Vec::new() }
55 }
56 fn to_outer(self) -> () {
57 ()
58 }
59 fn summarize(_summary: <Self>::Summary) -> () {
60 ()
61 }
62}
63
64use timely::progress::PathSummary;
67
68#[derive(
70 Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation
71)]
72pub struct PointStampSummary<TS> {
73 pub retain: Option<usize>,
77 pub actions: Vec<TS>,
82}
83
84impl<T: Timestamp> PathSummary<PointStamp<T>> for PointStampSummary<T::Summary> {
85 fn results_in(&self, timestamp: &PointStamp<T>) -> Option<PointStamp<T>> {
86 let timestamps = if let Some(retain) = self.retain {
88 if retain < timestamp.vector.len() {
89 ×tamp.vector[..retain]
90 } else {
91 ×tamp.vector[..]
92 }
93 } else {
94 ×tamp.vector[..]
95 };
96
97 let mut vector = Vec::with_capacity(std::cmp::max(timestamps.len(), self.actions.len()));
98 let min_len = std::cmp::min(timestamps.len(), self.actions.len());
100 for (action, timestamp) in self.actions.iter().zip(timestamps.iter()) {
101 vector.push(action.results_in(timestamp)?);
102 }
103 for timestamp in timestamps.iter().skip(min_len) {
105 vector.push(timestamp.clone());
106 }
107 for action in self.actions.iter().skip(min_len) {
109 vector.push(action.results_in(&T::minimum())?);
110 }
111
112 Some(PointStamp { vector })
113 }
114 fn followed_by(&self, other: &Self) -> Option<Self> {
115 let retain = match (self.retain, other.retain) {
117 (Some(x), Some(y)) => Some(std::cmp::min(x, y)),
118 (Some(x), None) => Some(x),
119 (None, Some(y)) => Some(y),
120 (None, None) => None,
121 };
122
123 let self_actions = if let Some(retain) = other.retain {
125 if retain < self.actions.len() {
126 &self.actions[..retain]
127 } else {
128 &self.actions[..]
129 }
130 } else {
131 &self.actions[..]
132 };
133
134 let mut actions = Vec::with_capacity(std::cmp::max(self_actions.len(), other.actions.len()));
135 let min_len = std::cmp::min(self_actions.len(), other.actions.len());
137 for (action1, action2) in self_actions.iter().zip(other.actions.iter()) {
138 actions.push(action1.followed_by(action2)?);
139 }
140 actions.extend(self_actions.iter().skip(min_len).cloned());
142 actions.extend(other.actions.iter().skip(min_len).cloned());
144
145 Some(Self { retain, actions })
146 }
147}
148
149impl<TS: PartialOrder> PartialOrder for PointStampSummary<TS> {
150 fn less_equal(&self, other: &Self) -> bool {
151 self.retain == other.retain
156 && self.actions.len() <= other.actions.len()
157 && self
158 .actions
159 .iter()
160 .zip(other.actions.iter())
161 .all(|(t1, t2)| t1.less_equal(t2))
162 }
163}
164
165use timely::progress::Timestamp;
167impl<T: Timestamp> Timestamp for PointStamp<T> {
168 fn minimum() -> Self {
169 Self { vector: Vec::new() }
170 }
171 type Summary = PointStampSummary<T::Summary>;
172}
173
174use crate::lattice::Lattice;
177impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
178 fn join(&self, other: &Self) -> Self {
179 let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
180 let max_len = ::std::cmp::max(self.vector.len(), other.vector.len());
181 let mut vector = Vec::with_capacity(max_len);
182 for index in 0..min_len {
184 vector.push(self.vector[index].join(&other.vector[index]));
185 }
186 for time in &self.vector[min_len..] {
188 vector.push(time.clone());
189 }
190 for time in &other.vector[min_len..] {
191 vector.push(time.clone());
192 }
193 Self { vector }
194 }
195 fn meet(&self, other: &Self) -> Self {
196 let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
197 let mut vector = Vec::with_capacity(min_len);
198 for index in 0..min_len {
200 vector.push(self.vector[index].meet(&other.vector[index]));
201 }
202 Self { vector }
204 }
205}
206
207use timely::container::columnation::{Columnation, Region};
208impl<T: Columnation> Columnation for PointStamp<T> {
209 type InnerRegion = PointStampStack<T::InnerRegion>;
210}
211
212pub struct PointStampStack<R: Region>(<Vec<R::Item> as Columnation>::InnerRegion)
214where
215 <R as Region>::Item: Columnation;
216
217impl<R: Region> Default for PointStampStack<R>
218 where
219 <R as Region>::Item: Columnation
220{
221 #[inline]
222 fn default() -> Self {
223 Self(Default::default())
224 }
225}
226
227impl<R: Region> Region for PointStampStack<R>
228 where
229 <R as Region>::Item: Columnation
230{
231 type Item = PointStamp<R::Item>;
232
233 #[inline]
234 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
235 Self::Item { vector: self.0.copy(&item.vector) }
236 }
237
238 fn clear(&mut self) {
239 self.0.clear();
240 }
241
242 fn reserve_items<'a, I>(&mut self, items: I) where Self: 'a, I: Iterator<Item=&'a Self::Item> + Clone {
243 self.0.reserve_items(items.map(|x| &x.vector));
244 }
245
246 fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator<Item=&'a Self> + Clone {
247 self.0.reserve_regions(regions.map(|r| &r.0));
248 }
249
250 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
251 self.0.heap_size(callback);
252 }
253}