Skip to main content

palimpsest_dataflow/trace/wrappers/
enter_at.rs

1//! Wrappers to provide trace access to nested scopes.
2
3use timely::progress::timestamp::Refines;
4use timely::progress::{frontier::AntichainRef, Antichain};
5
6use crate::lattice::Lattice;
7use crate::trace::cursor::Cursor;
8use crate::trace::{BatchReader, Description, TraceReader};
9
10/// Wrapper to provide trace to nested scope.
11///
12/// Each wrapped update is presented with a timestamp determined by `logic`.
13///
14/// At the same time, we require a method `prior` that can "invert" timestamps,
15/// and which will be applied to compaction frontiers as they are communicated
16/// back to the wrapped traces. A better explanation is pending, and until that
17/// happens use this construct at your own peril!
18pub struct TraceEnter<Tr: TraceReader, TInner, F, G> {
19    trace: Tr,
20    stash1: Antichain<Tr::Time>,
21    stash2: Antichain<TInner>,
22    logic: F,
23    prior: G,
24}
25
26impl<Tr, TInner, F, G> Clone for TraceEnter<Tr, TInner, F, G>
27where
28    Tr: TraceReader + Clone,
29    F: Clone,
30    G: Clone,
31{
32    fn clone(&self) -> Self {
33        TraceEnter {
34            trace: self.trace.clone(),
35            stash1: Antichain::new(),
36            stash2: Antichain::new(),
37            logic: self.logic.clone(),
38            prior: self.prior.clone(),
39        }
40    }
41}
42
43impl<Tr, TInner, F, G> WithLayout for TraceEnter<Tr, TInner, F, G>
44where
45    Tr: TraceReader<Batch: Clone>,
46    TInner: Refines<Tr::Time> + Lattice,
47    F: Clone,
48    G: Clone,
49{
50    type Layout = (
51        <Tr::Layout as Layout>::KeyContainer,
52        <Tr::Layout as Layout>::ValContainer,
53        Vec<TInner>,
54        <Tr::Layout as Layout>::DiffContainer,
55        <Tr::Layout as Layout>::OffsetContainer,
56    );
57}
58
59impl<Tr, TInner, F, G> TraceReader for TraceEnter<Tr, TInner, F, G>
60where
61    Tr: TraceReader<Batch: Clone>,
62    TInner: Refines<Tr::Time> + Lattice,
63    F: 'static,
64    F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>) -> TInner + Clone,
65    G: FnMut(&TInner) -> Tr::Time + Clone + 'static,
66{
67    type Batch = BatchEnter<Tr::Batch, TInner, F>;
68    type Storage = Tr::Storage;
69    type Cursor = CursorEnter<Tr::Cursor, TInner, F>;
70
71    fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
72        let logic = self.logic.clone();
73        self.trace.map_batches(|batch| {
74            f(&Self::Batch::make_from(batch.clone(), logic.clone()));
75        })
76    }
77
78    fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
79        self.stash1.clear();
80        for time in frontier.iter() {
81            self.stash1.insert((self.prior)(time));
82        }
83        self.trace.set_logical_compaction(self.stash1.borrow());
84    }
85    fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> {
86        self.stash2.clear();
87        for time in self.trace.get_logical_compaction().iter() {
88            self.stash2.insert(TInner::to_inner(time.clone()));
89        }
90        self.stash2.borrow()
91    }
92
93    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
94        self.stash1.clear();
95        for time in frontier.iter() {
96            self.stash1.insert((self.prior)(time));
97        }
98        self.trace.set_physical_compaction(self.stash1.borrow());
99    }
100    fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> {
101        self.stash2.clear();
102        for time in self.trace.get_physical_compaction().iter() {
103            self.stash2.insert(TInner::to_inner(time.clone()));
104        }
105        self.stash2.borrow()
106    }
107
108    fn cursor_through(
109        &mut self,
110        upper: AntichainRef<TInner>,
111    ) -> Option<(Self::Cursor, Self::Storage)> {
112        self.stash1.clear();
113        for time in upper.iter() {
114            self.stash1.insert(time.clone().to_outer());
115        }
116        self.trace
117            .cursor_through(self.stash1.borrow())
118            .map(|(x, y)| (CursorEnter::new(x, self.logic.clone()), y))
119    }
120}
121
122impl<Tr, TInner, F, G> TraceEnter<Tr, TInner, F, G>
123where
124    Tr: TraceReader,
125    TInner: Refines<Tr::Time> + Lattice,
126{
127    /// Makes a new trace wrapper
128    pub fn make_from(trace: Tr, logic: F, prior: G) -> Self {
129        TraceEnter {
130            trace,
131            stash1: Antichain::new(),
132            stash2: Antichain::new(),
133            logic,
134            prior,
135        }
136    }
137}
138
139/// Wrapper to provide batch to nested scope.
140#[derive(Clone)]
141pub struct BatchEnter<B, TInner, F> {
142    batch: B,
143    description: Description<TInner>,
144    logic: F,
145}
146
147impl<B, TInner, F> WithLayout for BatchEnter<B, TInner, F>
148where
149    B: BatchReader,
150    TInner: Refines<B::Time> + Lattice,
151{
152    type Layout = (
153        <B::Layout as Layout>::KeyContainer,
154        <B::Layout as Layout>::ValContainer,
155        Vec<TInner>,
156        <B::Layout as Layout>::DiffContainer,
157        <B::Layout as Layout>::OffsetContainer,
158    );
159}
160
161use crate::trace::implementations::LayoutExt;
162impl<B, TInner, F> BatchReader for BatchEnter<B, TInner, F>
163where
164    B: BatchReader,
165    TInner: Refines<B::Time> + Lattice,
166    F: FnMut(B::Key<'_>, <B::Cursor as LayoutExt>::Val<'_>, B::TimeGat<'_>) -> TInner + Clone,
167{
168    type Cursor = BatchCursorEnter<B::Cursor, TInner, F>;
169
170    fn cursor(&self) -> Self::Cursor {
171        BatchCursorEnter::new(self.batch.cursor(), self.logic.clone())
172    }
173    fn len(&self) -> usize {
174        self.batch.len()
175    }
176    fn description(&self) -> &Description<TInner> {
177        &self.description
178    }
179}
180
181impl<B, TInner, F> BatchEnter<B, TInner, F>
182where
183    B: BatchReader,
184    TInner: Refines<B::Time> + Lattice,
185{
186    /// Makes a new batch wrapper
187    pub fn make_from(batch: B, logic: F) -> Self {
188        let lower: Vec<_> = batch
189            .description()
190            .lower()
191            .elements()
192            .iter()
193            .map(|x| TInner::to_inner(x.clone()))
194            .collect();
195        let upper: Vec<_> = batch
196            .description()
197            .upper()
198            .elements()
199            .iter()
200            .map(|x| TInner::to_inner(x.clone()))
201            .collect();
202        let since: Vec<_> = batch
203            .description()
204            .since()
205            .elements()
206            .iter()
207            .map(|x| TInner::to_inner(x.clone()))
208            .collect();
209
210        BatchEnter {
211            batch,
212            description: Description::new(
213                Antichain::from(lower),
214                Antichain::from(upper),
215                Antichain::from(since),
216            ),
217            logic,
218        }
219    }
220}
221
222/// Wrapper to provide cursor to nested scope.
223pub struct CursorEnter<C, TInner, F> {
224    phantom: ::std::marker::PhantomData<TInner>,
225    cursor: C,
226    logic: F,
227}
228
229use crate::trace::implementations::{Layout, WithLayout};
230impl<C, TInner, F> WithLayout for CursorEnter<C, TInner, F>
231where
232    C: Cursor,
233    TInner: Refines<C::Time> + Lattice,
234{
235    type Layout = (
236        <C::Layout as Layout>::KeyContainer,
237        <C::Layout as Layout>::ValContainer,
238        Vec<TInner>,
239        <C::Layout as Layout>::DiffContainer,
240        <C::Layout as Layout>::OffsetContainer,
241    );
242}
243
244impl<C, TInner, F> CursorEnter<C, TInner, F> {
245    fn new(cursor: C, logic: F) -> Self {
246        CursorEnter {
247            phantom: ::std::marker::PhantomData,
248            cursor,
249            logic,
250        }
251    }
252}
253
254impl<C, TInner, F> Cursor for CursorEnter<C, TInner, F>
255where
256    C: Cursor,
257    TInner: Refines<C::Time> + Lattice,
258    F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>) -> TInner,
259{
260    type Storage = C::Storage;
261
262    #[inline]
263    fn key_valid(&self, storage: &Self::Storage) -> bool {
264        self.cursor.key_valid(storage)
265    }
266    #[inline]
267    fn val_valid(&self, storage: &Self::Storage) -> bool {
268        self.cursor.val_valid(storage)
269    }
270
271    #[inline]
272    fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
273        self.cursor.key(storage)
274    }
275    #[inline]
276    fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
277        self.cursor.val(storage)
278    }
279
280    #[inline]
281    fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
282        self.cursor.get_key(storage)
283    }
284    #[inline]
285    fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
286        self.cursor.get_val(storage)
287    }
288
289    #[inline]
290    fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(
291        &mut self,
292        storage: &Self::Storage,
293        mut logic: L,
294    ) {
295        let key = self.key(storage);
296        let val = self.val(storage);
297        let logic2 = &mut self.logic;
298        self.cursor
299            .map_times(storage, |time, diff| logic(&logic2(key, val, time), diff))
300    }
301
302    #[inline]
303    fn step_key(&mut self, storage: &Self::Storage) {
304        self.cursor.step_key(storage)
305    }
306    #[inline]
307    fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
308        self.cursor.seek_key(storage, key)
309    }
310
311    #[inline]
312    fn step_val(&mut self, storage: &Self::Storage) {
313        self.cursor.step_val(storage)
314    }
315    #[inline]
316    fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
317        self.cursor.seek_val(storage, val)
318    }
319
320    #[inline]
321    fn rewind_keys(&mut self, storage: &Self::Storage) {
322        self.cursor.rewind_keys(storage)
323    }
324    #[inline]
325    fn rewind_vals(&mut self, storage: &Self::Storage) {
326        self.cursor.rewind_vals(storage)
327    }
328}
329
330/// Wrapper to provide cursor to nested scope.
331pub struct BatchCursorEnter<C, TInner, F> {
332    phantom: ::std::marker::PhantomData<TInner>,
333    cursor: C,
334    logic: F,
335}
336
337impl<C, TInner, F> WithLayout for BatchCursorEnter<C, TInner, F>
338where
339    C: Cursor,
340    TInner: Refines<C::Time> + Lattice,
341{
342    type Layout = (
343        <C::Layout as Layout>::KeyContainer,
344        <C::Layout as Layout>::ValContainer,
345        Vec<TInner>,
346        <C::Layout as Layout>::DiffContainer,
347        <C::Layout as Layout>::OffsetContainer,
348    );
349}
350
351impl<C, TInner, F> BatchCursorEnter<C, TInner, F> {
352    fn new(cursor: C, logic: F) -> Self {
353        BatchCursorEnter {
354            phantom: ::std::marker::PhantomData,
355            cursor,
356            logic,
357        }
358    }
359}
360
361impl<TInner, C: Cursor, F> Cursor for BatchCursorEnter<C, TInner, F>
362where
363    TInner: Refines<C::Time> + Lattice,
364    F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>) -> TInner,
365{
366    type Storage = BatchEnter<C::Storage, TInner, F>;
367
368    #[inline]
369    fn key_valid(&self, storage: &Self::Storage) -> bool {
370        self.cursor.key_valid(&storage.batch)
371    }
372    #[inline]
373    fn val_valid(&self, storage: &Self::Storage) -> bool {
374        self.cursor.val_valid(&storage.batch)
375    }
376
377    #[inline]
378    fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
379        self.cursor.key(&storage.batch)
380    }
381    #[inline]
382    fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
383        self.cursor.val(&storage.batch)
384    }
385
386    #[inline]
387    fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
388        self.cursor.get_key(&storage.batch)
389    }
390    #[inline]
391    fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
392        self.cursor.get_val(&storage.batch)
393    }
394
395    #[inline]
396    fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(
397        &mut self,
398        storage: &Self::Storage,
399        mut logic: L,
400    ) {
401        let key = self.key(storage);
402        let val = self.val(storage);
403        let logic2 = &mut self.logic;
404        self.cursor.map_times(&storage.batch, |time, diff| {
405            logic(&logic2(key, val, time), diff)
406        })
407    }
408
409    #[inline]
410    fn step_key(&mut self, storage: &Self::Storage) {
411        self.cursor.step_key(&storage.batch)
412    }
413    #[inline]
414    fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
415        self.cursor.seek_key(&storage.batch, key)
416    }
417
418    #[inline]
419    fn step_val(&mut self, storage: &Self::Storage) {
420        self.cursor.step_val(&storage.batch)
421    }
422    #[inline]
423    fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
424        self.cursor.seek_val(&storage.batch, val)
425    }
426
427    #[inline]
428    fn rewind_keys(&mut self, storage: &Self::Storage) {
429        self.cursor.rewind_keys(&storage.batch)
430    }
431    #[inline]
432    fn rewind_vals(&mut self, storage: &Self::Storage) {
433        self.cursor.rewind_vals(&storage.batch)
434    }
435}