differential_dataflow/trace/wrappers/
enter_at.rs

1//! Wrappers to provide trace access to nested scopes.
2
3use timely::progress::timestamp::Refines;
4use timely::progress::{Antichain, frontier::AntichainRef};
5
6use crate::lattice::Lattice;
7use crate::trace::{TraceReader, BatchReader, Description};
8use crate::trace::cursor::Cursor;
9
10/// Wrapper to provide trace to nested scope.
11///
12/// Each wrapped update is presented with a timestamp determined by `logic`.
13///
14/// At the same time, we require a method `prior` that can "invert" timestamps,
15/// and which will be applied to compaction frontiers as they are communicated
16/// back to the wrapped traces. A better explanation is pending, and until that
17/// happens use this construct at your own peril!
18pub struct TraceEnter<Tr: TraceReader, TInner, F, G> {
19    trace: Tr,
20    stash1: Antichain<Tr::Time>,
21    stash2: Antichain<TInner>,
22    logic: F,
23    prior: G,
24}
25
26impl<Tr,TInner,F,G> Clone for TraceEnter<Tr, TInner, F, G>
27where
28    Tr: TraceReader+Clone,
29    F: Clone,
30    G: Clone,
31{
32    fn clone(&self) -> Self {
33        TraceEnter {
34            trace: self.trace.clone(),
35            stash1: Antichain::new(),
36            stash2: Antichain::new(),
37            logic: self.logic.clone(),
38            prior: self.prior.clone(),
39        }
40    }
41}
42
43impl<Tr, TInner, F, G> TraceReader for TraceEnter<Tr, TInner, F, G>
44where
45    Tr: TraceReader,
46    Tr::Batch: Clone,
47    TInner: Refines<Tr::Time>+Lattice,
48    F: 'static,
49    F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone,
50    G: FnMut(&TInner)->Tr::Time+Clone+'static,
51{
52    type Key<'a> = Tr::Key<'a>;
53    type Val<'a> = Tr::Val<'a>;
54    type Time = TInner;
55    type TimeGat<'a> = &'a TInner;
56    type Diff = Tr::Diff;
57    type DiffGat<'a> = Tr::DiffGat<'a>;
58
59    type Batch = BatchEnter<Tr::Batch, TInner,F>;
60    type Storage = Tr::Storage;
61    type Cursor = CursorEnter<Tr::Cursor, TInner,F>;
62
63    fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
64        let logic = self.logic.clone();
65        self.trace.map_batches(|batch| {
66            f(&Self::Batch::make_from(batch.clone(), logic.clone()));
67        })
68    }
69
70    fn set_logical_compaction(&mut self, frontier: AntichainRef<TInner>) {
71        self.stash1.clear();
72        for time in frontier.iter() {
73            self.stash1.insert((self.prior)(time));
74        }
75        self.trace.set_logical_compaction(self.stash1.borrow());
76    }
77    fn get_logical_compaction(&mut self) -> AntichainRef<TInner> {
78        self.stash2.clear();
79        for time in self.trace.get_logical_compaction().iter() {
80            self.stash2.insert(TInner::to_inner(time.clone()));
81        }
82        self.stash2.borrow()
83    }
84
85    fn set_physical_compaction(&mut self, frontier: AntichainRef<TInner>) {
86        self.stash1.clear();
87        for time in frontier.iter() {
88            self.stash1.insert((self.prior)(time));
89        }
90        self.trace.set_physical_compaction(self.stash1.borrow());
91    }
92    fn get_physical_compaction(&mut self) -> AntichainRef<TInner> {
93        self.stash2.clear();
94        for time in self.trace.get_physical_compaction().iter() {
95            self.stash2.insert(TInner::to_inner(time.clone()));
96        }
97        self.stash2.borrow()
98    }
99
100    fn cursor_through(&mut self, upper: AntichainRef<TInner>) -> Option<(Self::Cursor, Self::Storage)> {
101        self.stash1.clear();
102        for time in upper.iter() {
103            self.stash1.insert(time.clone().to_outer());
104        }
105        self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x, self.logic.clone()), y))
106    }
107}
108
109impl<Tr, TInner, F, G> TraceEnter<Tr, TInner, F, G>
110where
111    Tr: TraceReader,
112    TInner: Refines<Tr::Time>+Lattice,
113{
114    /// Makes a new trace wrapper
115    pub fn make_from(trace: Tr, logic: F, prior: G) -> Self {
116        TraceEnter {
117            trace,
118            stash1: Antichain::new(),
119            stash2: Antichain::new(),
120            logic,
121            prior,
122        }
123    }
124}
125
126
127/// Wrapper to provide batch to nested scope.
128#[derive(Clone)]
129pub struct BatchEnter<B, TInner, F> {
130    batch: B,
131    description: Description<TInner>,
132    logic: F,
133}
134
135impl<B, TInner, F> BatchReader for BatchEnter<B, TInner, F>
136where
137    B: BatchReader,
138    TInner: Refines<B::Time>+Lattice,
139    F: FnMut(B::Key<'_>, <B::Cursor as Cursor>::Val<'_>, B::TimeGat<'_>)->TInner+Clone,
140{
141    type Key<'a> = B::Key<'a>;
142    type Val<'a> = B::Val<'a>;
143    type Time = TInner;
144    type TimeGat<'a> = &'a TInner;
145    type Diff = B::Diff;
146    type DiffGat<'a> = B::DiffGat<'a>;
147
148    type Cursor = BatchCursorEnter<B::Cursor, TInner, F>;
149
150    fn cursor(&self) -> Self::Cursor {
151        BatchCursorEnter::new(self.batch.cursor(), self.logic.clone())
152    }
153    fn len(&self) -> usize { self.batch.len() }
154    fn description(&self) -> &Description<TInner> { &self.description }
155}
156
157impl<B, TInner, F> BatchEnter<B, TInner, F>
158where
159    B: BatchReader,
160    TInner: Refines<B::Time>+Lattice,
161{
162    /// Makes a new batch wrapper
163    pub fn make_from(batch: B, logic: F) -> Self {
164        let lower: Vec<_> = batch.description().lower().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
165        let upper: Vec<_> = batch.description().upper().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
166        let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
167
168        BatchEnter {
169            batch,
170            description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since)),
171            logic,
172        }
173    }
174}
175
176/// Wrapper to provide cursor to nested scope.
177pub struct CursorEnter<C, TInner, F> {
178    phantom: ::std::marker::PhantomData<TInner>,
179    cursor: C,
180    logic: F,
181}
182
183impl<C, TInner, F> CursorEnter<C, TInner, F> {
184    fn new(cursor: C, logic: F) -> Self {
185        CursorEnter {
186            phantom: ::std::marker::PhantomData,
187            cursor,
188            logic,
189        }
190    }
191}
192
193impl<C, TInner, F> Cursor for CursorEnter<C, TInner, F>
194where
195    C: Cursor,
196    TInner: Refines<C::Time>+Lattice,
197    F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner,
198{
199    type Key<'a> = C::Key<'a>;
200    type Val<'a> = C::Val<'a>;
201    type Time = TInner;
202    type TimeGat<'a> = &'a TInner;
203    type Diff = C::Diff;
204    type DiffGat<'a> = C::DiffGat<'a>;
205
206    type Storage = C::Storage;
207
208    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
209    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
210
211    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
212    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
213
214    #[inline]
215    fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
216        let key = self.key(storage);
217        let val = self.val(storage);
218        let logic2 = &mut self.logic;
219        self.cursor.map_times(storage, |time, diff| {
220            logic(&logic2(key, val, time), diff)
221        })
222    }
223
224    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
225    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
226
227    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
228    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
229
230    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
231    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
232}
233
234
235
236/// Wrapper to provide cursor to nested scope.
237pub struct BatchCursorEnter<C, TInner, F> {
238    phantom: ::std::marker::PhantomData<TInner>,
239    cursor: C,
240    logic: F,
241}
242
243impl<C, TInner, F> BatchCursorEnter<C, TInner, F> {
244    fn new(cursor: C, logic: F) -> Self {
245        BatchCursorEnter {
246            phantom: ::std::marker::PhantomData,
247            cursor,
248            logic,
249        }
250    }
251}
252
253impl<TInner, C: Cursor, F> Cursor for BatchCursorEnter<C, TInner, F>
254where
255    TInner: Refines<C::Time>+Lattice,
256    F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner,
257{
258    type Key<'a> = C::Key<'a>;
259    type Val<'a> = C::Val<'a>;
260    type Time = TInner;
261    type TimeGat<'a> = &'a TInner;
262    type Diff = C::Diff;
263    type DiffGat<'a> = C::DiffGat<'a>;
264
265    type Storage = BatchEnter<C::Storage, TInner, F>;
266
267    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
268    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
269
270    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
271    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
272
273    #[inline]
274    fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
275        let key = self.key(storage);
276        let val = self.val(storage);
277        let logic2 = &mut self.logic;
278        self.cursor.map_times(&storage.batch, |time, diff| {
279            logic(&logic2(key, val, time), diff)
280        })
281    }
282
283    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
284    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
285
286    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
287    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
288
289    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
290    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
291}