Skip to main content

palimpsest_dataflow/trace/wrappers/
frontier.rs

1//! Wrapper for frontiered trace.
2//!
3//! Wraps a trace with `since` and `upper` frontiers so that all exposed timestamps are first advanced
4//! by the `since` frontier and restricted by the `upper` frontier. This presents a deterministic trace
5//! on the interval `[since, upper)`, presenting only accumulations up to `since` (rather than partially
6//! accumulated updates) and no updates at times greater or equal to `upper` (even as parts of batches
7//! that span that time).
8
9use timely::progress::{frontier::AntichainRef, Antichain};
10
11use crate::lattice::Lattice;
12use crate::trace::cursor::Cursor;
13use crate::trace::{BatchReader, Description, TraceReader};
14
15/// Wrapper to provide trace to nested scope.
16pub struct TraceFrontier<Tr: TraceReader> {
17    trace: Tr,
18    /// Frontier to which all update times will be advanced.
19    since: Antichain<Tr::Time>,
20    /// Frontier after which all update times will be suppressed.
21    until: Antichain<Tr::Time>,
22}
23
24impl<Tr: TraceReader + Clone> Clone for TraceFrontier<Tr> {
25    fn clone(&self) -> Self {
26        TraceFrontier {
27            trace: self.trace.clone(),
28            since: self.since.clone(),
29            until: self.until.clone(),
30        }
31    }
32}
33
34impl<Tr: TraceReader> WithLayout for TraceFrontier<Tr> {
35    type Layout = (
36        <Tr::Layout as Layout>::KeyContainer,
37        <Tr::Layout as Layout>::ValContainer,
38        Vec<Tr::Time>,
39        <Tr::Layout as Layout>::DiffContainer,
40        <Tr::Layout as Layout>::OffsetContainer,
41    );
42}
43
44impl<Tr: TraceReader> TraceReader for TraceFrontier<Tr> {
45    type Batch = BatchFrontier<Tr::Batch>;
46    type Storage = Tr::Storage;
47    type Cursor = CursorFrontier<Tr::Cursor, Tr::Time>;
48
49    fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
50        let since = self.since.borrow();
51        let until = self.until.borrow();
52        self.trace
53            .map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until)))
54    }
55
56    fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
57        self.trace.set_logical_compaction(frontier)
58    }
59    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
60        self.trace.get_logical_compaction()
61    }
62
63    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
64        self.trace.set_physical_compaction(frontier)
65    }
66    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
67        self.trace.get_physical_compaction()
68    }
69
70    fn cursor_through(
71        &mut self,
72        upper: AntichainRef<'_, Tr::Time>,
73    ) -> Option<(Self::Cursor, Self::Storage)> {
74        let since = self.since.borrow();
75        let until = self.until.borrow();
76        self.trace
77            .cursor_through(upper)
78            .map(|(x, y)| (CursorFrontier::new(x, since, until), y))
79    }
80}
81
82impl<Tr: TraceReader> TraceFrontier<Tr> {
83    /// Makes a new trace wrapper
84    pub fn make_from(
85        trace: Tr,
86        since: AntichainRef<'_, Tr::Time>,
87        until: AntichainRef<'_, Tr::Time>,
88    ) -> Self {
89        TraceFrontier {
90            trace,
91            since: since.to_owned(),
92            until: until.to_owned(),
93        }
94    }
95}
96
97/// Wrapper to provide batch to nested scope.
98#[derive(Clone)]
99pub struct BatchFrontier<B: BatchReader> {
100    batch: B,
101    since: Antichain<B::Time>,
102    until: Antichain<B::Time>,
103}
104
105impl<B: BatchReader> WithLayout for BatchFrontier<B> {
106    type Layout = (
107        <B::Layout as Layout>::KeyContainer,
108        <B::Layout as Layout>::ValContainer,
109        Vec<B::Time>,
110        <B::Layout as Layout>::DiffContainer,
111        <B::Layout as Layout>::OffsetContainer,
112    );
113}
114
115impl<B: BatchReader> BatchReader for BatchFrontier<B> {
116    type Cursor = BatchCursorFrontier<B::Cursor>;
117
118    fn cursor(&self) -> Self::Cursor {
119        BatchCursorFrontier::new(
120            self.batch.cursor(),
121            self.since.borrow(),
122            self.until.borrow(),
123        )
124    }
125    fn len(&self) -> usize {
126        self.batch.len()
127    }
128    fn description(&self) -> &Description<B::Time> {
129        self.batch.description()
130    }
131}
132
133impl<B: BatchReader> BatchFrontier<B> {
134    /// Makes a new batch wrapper
135    pub fn make_from(batch: B, since: AntichainRef<B::Time>, until: AntichainRef<B::Time>) -> Self {
136        BatchFrontier {
137            batch,
138            since: since.to_owned(),
139            until: until.to_owned(),
140        }
141    }
142}
143
144/// Wrapper to provide cursor to nested scope.
145pub struct CursorFrontier<C, T> {
146    cursor: C,
147    since: Antichain<T>,
148    until: Antichain<T>,
149}
150
151use crate::trace::implementations::{Layout, WithLayout};
152impl<C: Cursor> WithLayout for CursorFrontier<C, C::Time> {
153    type Layout = (
154        <C::Layout as Layout>::KeyContainer,
155        <C::Layout as Layout>::ValContainer,
156        Vec<C::Time>,
157        <C::Layout as Layout>::DiffContainer,
158        <C::Layout as Layout>::OffsetContainer,
159    );
160}
161
162impl<C, T: Clone> CursorFrontier<C, T> {
163    fn new(cursor: C, since: AntichainRef<T>, until: AntichainRef<T>) -> Self {
164        CursorFrontier {
165            cursor,
166            since: since.to_owned(),
167            until: until.to_owned(),
168        }
169    }
170}
171
172impl<C: Cursor> Cursor for CursorFrontier<C, C::Time> {
173    type Storage = C::Storage;
174
175    #[inline]
176    fn key_valid(&self, storage: &Self::Storage) -> bool {
177        self.cursor.key_valid(storage)
178    }
179    #[inline]
180    fn val_valid(&self, storage: &Self::Storage) -> bool {
181        self.cursor.val_valid(storage)
182    }
183
184    #[inline]
185    fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
186        self.cursor.key(storage)
187    }
188    #[inline]
189    fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
190        self.cursor.val(storage)
191    }
192
193    #[inline]
194    fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
195        self.cursor.get_key(storage)
196    }
197    #[inline]
198    fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
199        self.cursor.get_val(storage)
200    }
201
202    #[inline]
203    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
204        &mut self,
205        storage: &Self::Storage,
206        mut logic: L,
207    ) {
208        let since = self.since.borrow();
209        let until = self.until.borrow();
210        let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
211        self.cursor.map_times(storage, |time, diff| {
212            C::clone_time_onto(time, &mut temp);
213            temp.advance_by(since);
214            if !until.less_equal(&temp) {
215                logic(&temp, diff);
216            }
217        })
218    }
219
220    #[inline]
221    fn step_key(&mut self, storage: &Self::Storage) {
222        self.cursor.step_key(storage)
223    }
224    #[inline]
225    fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
226        self.cursor.seek_key(storage, key)
227    }
228
229    #[inline]
230    fn step_val(&mut self, storage: &Self::Storage) {
231        self.cursor.step_val(storage)
232    }
233    #[inline]
234    fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
235        self.cursor.seek_val(storage, val)
236    }
237
238    #[inline]
239    fn rewind_keys(&mut self, storage: &Self::Storage) {
240        self.cursor.rewind_keys(storage)
241    }
242    #[inline]
243    fn rewind_vals(&mut self, storage: &Self::Storage) {
244        self.cursor.rewind_vals(storage)
245    }
246}
247
248/// Wrapper to provide cursor to nested scope.
249pub struct BatchCursorFrontier<C: Cursor> {
250    cursor: C,
251    since: Antichain<C::Time>,
252    until: Antichain<C::Time>,
253}
254
255impl<C: Cursor> WithLayout for BatchCursorFrontier<C> {
256    type Layout = (
257        <C::Layout as Layout>::KeyContainer,
258        <C::Layout as Layout>::ValContainer,
259        Vec<C::Time>,
260        <C::Layout as Layout>::DiffContainer,
261        <C::Layout as Layout>::OffsetContainer,
262    );
263}
264
265impl<C: Cursor> BatchCursorFrontier<C> {
266    fn new(cursor: C, since: AntichainRef<C::Time>, until: AntichainRef<C::Time>) -> Self {
267        BatchCursorFrontier {
268            cursor,
269            since: since.to_owned(),
270            until: until.to_owned(),
271        }
272    }
273}
274
275impl<C: Cursor<Storage: BatchReader>> Cursor for BatchCursorFrontier<C> {
276    type Storage = BatchFrontier<C::Storage>;
277
278    #[inline]
279    fn key_valid(&self, storage: &Self::Storage) -> bool {
280        self.cursor.key_valid(&storage.batch)
281    }
282    #[inline]
283    fn val_valid(&self, storage: &Self::Storage) -> bool {
284        self.cursor.val_valid(&storage.batch)
285    }
286
287    #[inline]
288    fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
289        self.cursor.key(&storage.batch)
290    }
291    #[inline]
292    fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
293        self.cursor.val(&storage.batch)
294    }
295
296    #[inline]
297    fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
298        self.cursor.get_key(&storage.batch)
299    }
300    #[inline]
301    fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
302        self.cursor.get_val(&storage.batch)
303    }
304
305    #[inline]
306    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
307        &mut self,
308        storage: &Self::Storage,
309        mut logic: L,
310    ) {
311        let since = self.since.borrow();
312        let until = self.until.borrow();
313        let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
314        self.cursor.map_times(&storage.batch, |time, diff| {
315            C::clone_time_onto(time, &mut temp);
316            temp.advance_by(since);
317            if !until.less_equal(&temp) {
318                logic(&temp, diff);
319            }
320        })
321    }
322
323    #[inline]
324    fn step_key(&mut self, storage: &Self::Storage) {
325        self.cursor.step_key(&storage.batch)
326    }
327    #[inline]
328    fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
329        self.cursor.seek_key(&storage.batch, key)
330    }
331
332    #[inline]
333    fn step_val(&mut self, storage: &Self::Storage) {
334        self.cursor.step_val(&storage.batch)
335    }
336    #[inline]
337    fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
338        self.cursor.seek_val(&storage.batch, val)
339    }
340
341    #[inline]
342    fn rewind_keys(&mut self, storage: &Self::Storage) {
343        self.cursor.rewind_keys(&storage.batch)
344    }
345    #[inline]
346    fn rewind_vals(&mut self, storage: &Self::Storage) {
347        self.cursor.rewind_vals(&storage.batch)
348    }
349}