differential_dataflow/trace/wrappers/
frontier.rs

1//! Wrapper for frontiered trace.
2//!
3//! Wraps a trace with `since` and `upper` frontiers so that all exposed timestamps are first advanced
4//! by the `since` frontier and restricted by the `upper` frontier. This presents a deterministic trace
5//! on the interval `[since, upper)`, presenting only accumulations up to `since` (rather than partially
6//! accumulated updates) and no updates at times greater or equal to `upper` (even as parts of batches
7//! that span that time).
8
9use timely::progress::Timestamp;
10use timely::progress::{Antichain, frontier::AntichainRef};
11
12use crate::trace::{TraceReader, BatchReader, Description};
13use crate::trace::cursor::Cursor;
14use crate::lattice::Lattice;
15
16/// Wrapper to provide trace to nested scope.
17pub struct TraceFrontier<Tr>
18where
19    Tr: TraceReader,
20{
21    trace: Tr,
22    /// Frontier to which all update times will be advanced.
23    since: Antichain<Tr::Time>,
24    /// Frontier after which all update times will be suppressed.
25    until: Antichain<Tr::Time>,
26}
27
28impl<Tr> Clone for TraceFrontier<Tr>
29where
30    Tr: TraceReader+Clone,
31    Tr::Time: Clone,
32{
33    fn clone(&self) -> Self {
34        TraceFrontier {
35            trace: self.trace.clone(),
36            since: self.since.clone(),
37            until: self.until.clone(),
38        }
39    }
40}
41
42impl<Tr> TraceReader for TraceFrontier<Tr>
43where
44    Tr: TraceReader,
45    Tr::Batch: Clone,
46    Tr::Time: Timestamp+Lattice,
47    Tr::Diff: 'static,
48{
49    type Key<'a> = Tr::Key<'a>;
50    type KeyOwned = Tr::KeyOwned;
51    type Val<'a> = Tr::Val<'a>;
52    type ValOwned = Tr::ValOwned;
53    type Time = Tr::Time;
54    type Diff = Tr::Diff;
55
56    type Batch = BatchFrontier<Tr::Batch>;
57    type Storage = Tr::Storage;
58    type Cursor = CursorFrontier<Tr::Cursor, Tr::Time>;
59
60    fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
61        let since = self.since.borrow();
62        let until = self.until.borrow();
63        self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until)))
64    }
65
66    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
67    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
68
69    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
70    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
71
72    fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
73        let since = self.since.borrow();
74        let until = self.until.borrow();
75        self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, since, until), y))
76    }
77}
78
79impl<Tr> TraceFrontier<Tr>
80where
81    Tr: TraceReader,
82    Tr::Time: Timestamp,
83{
84    /// Makes a new trace wrapper
85    pub fn make_from(trace: Tr, since: AntichainRef<Tr::Time>, until: AntichainRef<Tr::Time>) -> Self {
86        TraceFrontier {
87            trace,
88            since: since.to_owned(),
89            until: until.to_owned(),
90        }
91    }
92}
93
94
95/// Wrapper to provide batch to nested scope.
96#[derive(Clone)]
97pub struct BatchFrontier<B: BatchReader> {
98    batch: B,
99    since: Antichain<B::Time>,
100    until: Antichain<B::Time>,
101}
102
103impl<B> BatchReader for BatchFrontier<B>
104where
105    B: BatchReader,
106    B::Time: Timestamp+Lattice,
107{
108    type Key<'a> = B::Key<'a>;
109    type KeyOwned = B::KeyOwned;
110    type Val<'a> = B::Val<'a>;
111    type ValOwned = B::ValOwned;
112    type Time = B::Time;
113    type Diff = B::Diff;
114
115    type Cursor = BatchCursorFrontier<B::Cursor>;
116
117    fn cursor(&self) -> Self::Cursor {
118        BatchCursorFrontier::new(self.batch.cursor(), self.since.borrow(), self.until.borrow())
119    }
120    fn len(&self) -> usize { self.batch.len() }
121    fn description(&self) -> &Description<B::Time> { self.batch.description() }
122}
123
124impl<B> BatchFrontier<B>
125where
126    B: BatchReader,
127    B::Time: Timestamp+Lattice,
128{
129    /// Makes a new batch wrapper
130    pub fn make_from(batch: B, since: AntichainRef<B::Time>, until: AntichainRef<B::Time>) -> Self {
131        BatchFrontier {
132            batch,
133            since: since.to_owned(),
134            until: until.to_owned(),
135        }
136    }
137}
138
139/// Wrapper to provide cursor to nested scope.
140pub struct CursorFrontier<C, T> {
141    cursor: C,
142    since: Antichain<T>,
143    until: Antichain<T>
144}
145
146impl<C, T> CursorFrontier<C, T> where T: Clone {
147    fn new(cursor: C, since: AntichainRef<T>, until: AntichainRef<T>) -> Self {
148        CursorFrontier {
149            cursor,
150            since: since.to_owned(),
151            until: until.to_owned(),
152        }
153    }
154}
155
156impl<C, T> Cursor for CursorFrontier<C, T>
157where
158    C: Cursor<Time=T>,
159    T: Timestamp+Lattice,
160{
161    type Key<'a> = C::Key<'a>;
162    type KeyOwned = C::KeyOwned;
163    type Val<'a> = C::Val<'a>;
164    type ValOwned = C::ValOwned;
165    type Time = C::Time;
166    type Diff = C::Diff;
167
168    type Storage = C::Storage;
169
170    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
171    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
172
173    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
174    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
175
176    #[inline]
177    fn map_times<L: FnMut(&Self::Time,&Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
178        let since = self.since.borrow();
179        let until = self.until.borrow();
180        let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
181        self.cursor.map_times(storage, |time, diff| {
182            temp.clone_from(time);
183            temp.advance_by(since);
184            if !until.less_equal(&temp) {
185                logic(&temp, diff);
186            }
187        })
188    }
189
190    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
191    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
192
193    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
194    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
195
196    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
197    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
198}
199
200
201
202/// Wrapper to provide cursor to nested scope.
203pub struct BatchCursorFrontier<C: Cursor> {
204    cursor: C,
205    since: Antichain<C::Time>,
206    until: Antichain<C::Time>,
207}
208
209impl<C: Cursor> BatchCursorFrontier<C> where C::Time: Clone {
210    fn new(cursor: C, since: AntichainRef<C::Time>, until: AntichainRef<C::Time>) -> Self {
211        BatchCursorFrontier {
212            cursor,
213            since: since.to_owned(),
214            until: until.to_owned(),
215        }
216    }
217}
218
219impl<C: Cursor> Cursor for BatchCursorFrontier<C>
220where
221    C::Time: Timestamp+Lattice,
222    C::Storage: BatchReader,
223{
224    type Key<'a> = C::Key<'a>;
225    type KeyOwned = C::KeyOwned;
226    type Val<'a> = C::Val<'a>;
227    type ValOwned = C::ValOwned;
228    type Time = C::Time;
229    type Diff = C::Diff;
230
231    type Storage = BatchFrontier<C::Storage>;
232
233    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
234    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
235
236    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
237    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
238
239    #[inline]
240    fn map_times<L: FnMut(&Self::Time,&Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
241        let since = self.since.borrow();
242        let until = self.until.borrow();
243        let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
244        self.cursor.map_times(&storage.batch, |time, diff| {
245            temp.clone_from(time);
246            temp.advance_by(since);
247            if !until.less_equal(&temp) {
248                logic(&temp, diff);
249            }
250        })
251    }
252
253    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
254    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
255
256    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
257    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
258
259    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
260    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
261}