differential_dataflow/trace/wrappers/
enter_at.rs

1//! Wrappers to provide trace access to nested scopes.
2
3use timely::progress::timestamp::Refines;
4use timely::progress::Timestamp;
5use timely::progress::{Antichain, frontier::AntichainRef};
6
7use crate::lattice::Lattice;
8use crate::trace::{TraceReader, BatchReader, Description};
9use crate::trace::cursor::Cursor;
10
11/// Wrapper to provide trace to nested scope.
12///
13/// Each wrapped update is presented with a timestamp determined by `logic`.
14///
15/// At the same time, we require a method `prior` that can "invert" timestamps,
16/// and which will be applied to compaction frontiers as they are communicated
17/// back to the wrapped traces. A better explanation is pending, and until that
18/// happens use this construct at your own peril!
19pub struct TraceEnter<Tr, TInner, F, G>
20where
21    Tr: TraceReader,
22{
23    trace: Tr,
24    stash1: Antichain<Tr::Time>,
25    stash2: Antichain<TInner>,
26    logic: F,
27    prior: G,
28}
29
30impl<Tr,TInner,F,G> Clone for TraceEnter<Tr, TInner, F, G>
31where
32    Tr: TraceReader+Clone,
33    F: Clone,
34    G: Clone,
35{
36    fn clone(&self) -> Self {
37        TraceEnter {
38            trace: self.trace.clone(),
39            stash1: Antichain::new(),
40            stash2: Antichain::new(),
41            logic: self.logic.clone(),
42            prior: self.prior.clone(),
43        }
44    }
45}
46
47impl<Tr, TInner, F, G> TraceReader for TraceEnter<Tr, TInner, F, G>
48where
49    Tr: TraceReader,
50    Tr::Batch: Clone,
51    Tr::Time: Timestamp,
52    TInner: Refines<Tr::Time>+Lattice,
53    Tr::Diff: 'static,
54    F: 'static,
55    F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &Tr::Time)->TInner+Clone,
56    G: FnMut(&TInner)->Tr::Time+Clone+'static,
57{
58    type Key<'a> = Tr::Key<'a>;
59    type KeyOwned = Tr::KeyOwned;
60    type Val<'a> = Tr::Val<'a>;
61    type ValOwned = Tr::ValOwned;
62    type Time = TInner;
63    type Diff = Tr::Diff;
64
65    type Batch = BatchEnter<Tr::Batch, TInner,F>;
66    type Storage = Tr::Storage;
67    type Cursor = CursorEnter<Tr::Cursor, TInner,F>;
68
69    fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
70        let logic = self.logic.clone();
71        self.trace.map_batches(|batch| {
72            f(&Self::Batch::make_from(batch.clone(), logic.clone()));
73        })
74    }
75
76    fn set_logical_compaction(&mut self, frontier: AntichainRef<TInner>) {
77        self.stash1.clear();
78        for time in frontier.iter() {
79            self.stash1.insert((self.prior)(time));
80        }
81        self.trace.set_logical_compaction(self.stash1.borrow());
82    }
83    fn get_logical_compaction(&mut self) -> AntichainRef<TInner> {
84        self.stash2.clear();
85        for time in self.trace.get_logical_compaction().iter() {
86            self.stash2.insert(TInner::to_inner(time.clone()));
87        }
88        self.stash2.borrow()
89    }
90
91    fn set_physical_compaction(&mut self, frontier: AntichainRef<TInner>) {
92        self.stash1.clear();
93        for time in frontier.iter() {
94            self.stash1.insert((self.prior)(time));
95        }
96        self.trace.set_physical_compaction(self.stash1.borrow());
97    }
98    fn get_physical_compaction(&mut self) -> AntichainRef<TInner> {
99        self.stash2.clear();
100        for time in self.trace.get_physical_compaction().iter() {
101            self.stash2.insert(TInner::to_inner(time.clone()));
102        }
103        self.stash2.borrow()
104    }
105
106    fn cursor_through(&mut self, upper: AntichainRef<TInner>) -> Option<(Self::Cursor, Self::Storage)> {
107        self.stash1.clear();
108        for time in upper.iter() {
109            self.stash1.insert(time.clone().to_outer());
110        }
111        self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x, self.logic.clone()), y))
112    }
113}
114
115impl<Tr, TInner, F, G> TraceEnter<Tr, TInner, F, G>
116where
117    Tr: TraceReader,
118    Tr::Time: Timestamp,
119    TInner: Refines<Tr::Time>+Lattice,
120{
121    /// Makes a new trace wrapper
122    pub fn make_from(trace: Tr, logic: F, prior: G) -> Self {
123        TraceEnter {
124            trace,
125            stash1: Antichain::new(),
126            stash2: Antichain::new(),
127            logic,
128            prior,
129        }
130    }
131}
132
133
134/// Wrapper to provide batch to nested scope.
135#[derive(Clone)]
136pub struct BatchEnter<B, TInner, F> {
137    batch: B,
138    description: Description<TInner>,
139    logic: F,
140}
141
142impl<B, TInner, F> BatchReader for BatchEnter<B, TInner, F>
143where
144    B: BatchReader,
145    B::Time: Timestamp,
146    TInner: Refines<B::Time>+Lattice,
147    F: FnMut(B::Key<'_>, <B::Cursor as Cursor>::Val<'_>, &B::Time)->TInner+Clone,
148{
149    type Key<'a> = B::Key<'a>;
150    type KeyOwned = B::KeyOwned;
151    type Val<'a> = B::Val<'a>;
152    type ValOwned = B::ValOwned;
153    type Time = TInner;
154    type Diff = B::Diff;
155
156    type Cursor = BatchCursorEnter<B::Cursor, TInner, F>;
157
158    fn cursor(&self) -> Self::Cursor {
159        BatchCursorEnter::new(self.batch.cursor(), self.logic.clone())
160    }
161    fn len(&self) -> usize { self.batch.len() }
162    fn description(&self) -> &Description<TInner> { &self.description }
163}
164
165impl<B, TInner, F> BatchEnter<B, TInner, F>
166where
167    B: BatchReader,
168    B::Time: Timestamp,
169    TInner: Refines<B::Time>+Lattice,
170{
171    /// Makes a new batch wrapper
172    pub fn make_from(batch: B, logic: F) -> Self {
173        let lower: Vec<_> = batch.description().lower().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
174        let upper: Vec<_> = batch.description().upper().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
175        let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
176
177        BatchEnter {
178            batch,
179            description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since)),
180            logic,
181        }
182    }
183}
184
185/// Wrapper to provide cursor to nested scope.
186pub struct CursorEnter<C, TInner, F> {
187    phantom: ::std::marker::PhantomData<TInner>,
188    cursor: C,
189    logic: F,
190}
191
192impl<C, TInner, F> CursorEnter<C, TInner, F> {
193    fn new(cursor: C, logic: F) -> Self {
194        CursorEnter {
195            phantom: ::std::marker::PhantomData,
196            cursor,
197            logic,
198        }
199    }
200}
201
202impl<C, TInner, F> Cursor for CursorEnter<C, TInner, F>
203where
204    C: Cursor,
205    C::Time: Timestamp,
206    TInner: Refines<C::Time>+Lattice,
207    F: FnMut(C::Key<'_>, C::Val<'_>, &C::Time)->TInner,
208{
209    type Key<'a> = C::Key<'a>;
210    type KeyOwned = C::KeyOwned;
211    type Val<'a> = C::Val<'a>;
212    type ValOwned = C::ValOwned;
213    type Time = TInner;
214    type Diff = C::Diff;
215
216    type Storage = C::Storage;
217
218    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
219    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
220
221    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
222    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
223
224    #[inline]
225    fn map_times<L: FnMut(&TInner, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
226        let key = self.key(storage);
227        let val = self.val(storage);
228        let logic2 = &mut self.logic;
229        self.cursor.map_times(storage, |time, diff| {
230            logic(&logic2(key, val, time), diff)
231        })
232    }
233
234    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
235    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
236
237    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
238    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
239
240    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
241    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
242}
243
244
245
246/// Wrapper to provide cursor to nested scope.
247pub struct BatchCursorEnter<C, TInner, F> {
248    phantom: ::std::marker::PhantomData<TInner>,
249    cursor: C,
250    logic: F,
251}
252
253impl<C, TInner, F> BatchCursorEnter<C, TInner, F> {
254    fn new(cursor: C, logic: F) -> Self {
255        BatchCursorEnter {
256            phantom: ::std::marker::PhantomData,
257            cursor,
258            logic,
259        }
260    }
261}
262
263impl<TInner, C: Cursor, F> Cursor for BatchCursorEnter<C, TInner, F>
264where
265    C::Time: Timestamp,
266    TInner: Refines<C::Time>+Lattice,
267    F: FnMut(C::Key<'_>, C::Val<'_>, &C::Time)->TInner,
268{
269    type Key<'a> = C::Key<'a>;
270    type KeyOwned = C::KeyOwned;
271    type Val<'a> = C::Val<'a>;
272    type ValOwned = C::ValOwned;
273    type Time = TInner;
274    type Diff = C::Diff;
275
276    type Storage = BatchEnter<C::Storage, TInner, F>;
277
278    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
279    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
280
281    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
282    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
283
284    #[inline]
285    fn map_times<L: FnMut(&TInner, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
286        let key = self.key(storage);
287        let val = self.val(storage);
288        let logic2 = &mut self.logic;
289        self.cursor.map_times(&storage.batch, |time, diff| {
290            logic(&logic2(key, val, time), diff)
291        })
292    }
293
294    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
295    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
296
297    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
298    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
299
300    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
301    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
302}