Skip to main content

palimpsest_dataflow/trace/wrappers/
enter.rs

1//! Wrappers to provide trace access to nested scopes.
2
3// use timely::progress::nested::product::Product;
4use timely::progress::timestamp::Refines;
5use timely::progress::{frontier::AntichainRef, Antichain};
6
7use crate::lattice::Lattice;
8use crate::trace::cursor::Cursor;
9use crate::trace::{BatchReader, Description, TraceReader};
10
11/// Wrapper to provide trace to nested scope.
12pub struct TraceEnter<Tr: TraceReader, TInner> {
13    trace: Tr,
14    stash1: Antichain<Tr::Time>,
15    stash2: Antichain<TInner>,
16}
17
18impl<Tr: TraceReader + Clone, TInner> Clone for TraceEnter<Tr, TInner> {
19    fn clone(&self) -> Self {
20        TraceEnter {
21            trace: self.trace.clone(),
22            stash1: Antichain::new(),
23            stash2: Antichain::new(),
24        }
25    }
26}
27
28impl<Tr, TInner> WithLayout for TraceEnter<Tr, TInner>
29where
30    Tr: TraceReader<Batch: Clone>,
31    TInner: Refines<Tr::Time> + Lattice,
32{
33    type Layout = (
34        <Tr::Layout as Layout>::KeyContainer,
35        <Tr::Layout as Layout>::ValContainer,
36        Vec<TInner>,
37        <Tr::Layout as Layout>::DiffContainer,
38        <Tr::Layout as Layout>::OffsetContainer,
39    );
40}
41
42impl<Tr, TInner> TraceReader for TraceEnter<Tr, TInner>
43where
44    Tr: TraceReader<Batch: Clone>,
45    TInner: Refines<Tr::Time> + Lattice,
46{
47    type Batch = BatchEnter<Tr::Batch, TInner>;
48    type Storage = Tr::Storage;
49    type Cursor = CursorEnter<Tr::Cursor, TInner>;
50
51    fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
52        self.trace.map_batches(|batch| {
53            f(&Self::Batch::make_from(batch.clone()));
54        })
55    }
56
57    fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
58        self.stash1.clear();
59        for time in frontier.iter() {
60            self.stash1.insert(time.clone().to_outer());
61        }
62        self.trace.set_logical_compaction(self.stash1.borrow());
63    }
64    fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> {
65        self.stash2.clear();
66        for time in self.trace.get_logical_compaction().iter() {
67            self.stash2.insert(TInner::to_inner(time.clone()));
68        }
69        self.stash2.borrow()
70    }
71
72    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
73        self.stash1.clear();
74        for time in frontier.iter() {
75            self.stash1.insert(time.clone().to_outer());
76        }
77        self.trace.set_physical_compaction(self.stash1.borrow());
78    }
79    fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> {
80        self.stash2.clear();
81        for time in self.trace.get_physical_compaction().iter() {
82            self.stash2.insert(TInner::to_inner(time.clone()));
83        }
84        self.stash2.borrow()
85    }
86
87    fn cursor_through(
88        &mut self,
89        upper: AntichainRef<TInner>,
90    ) -> Option<(Self::Cursor, Self::Storage)> {
91        self.stash1.clear();
92        for time in upper.iter() {
93            self.stash1.insert(time.clone().to_outer());
94        }
95        self.trace
96            .cursor_through(self.stash1.borrow())
97            .map(|(x, y)| (CursorEnter::new(x), y))
98    }
99}
100
101impl<Tr, TInner> TraceEnter<Tr, TInner>
102where
103    Tr: TraceReader,
104    TInner: Refines<Tr::Time> + Lattice,
105{
106    /// Makes a new trace wrapper
107    pub fn make_from(trace: Tr) -> Self {
108        TraceEnter {
109            trace,
110            stash1: Antichain::new(),
111            stash2: Antichain::new(),
112        }
113    }
114}
115
116/// Wrapper to provide batch to nested scope.
117#[derive(Clone)]
118pub struct BatchEnter<B, TInner> {
119    batch: B,
120    description: Description<TInner>,
121}
122
123impl<B, TInner> WithLayout for BatchEnter<B, TInner>
124where
125    B: BatchReader,
126    TInner: Refines<B::Time> + Lattice,
127{
128    type Layout = (
129        <B::Layout as Layout>::KeyContainer,
130        <B::Layout as Layout>::ValContainer,
131        Vec<TInner>,
132        <B::Layout as Layout>::DiffContainer,
133        <B::Layout as Layout>::OffsetContainer,
134    );
135}
136
137impl<B, TInner> BatchReader for BatchEnter<B, TInner>
138where
139    B: BatchReader,
140    TInner: Refines<B::Time> + Lattice,
141{
142    type Cursor = BatchCursorEnter<B::Cursor, TInner>;
143
144    fn cursor(&self) -> Self::Cursor {
145        BatchCursorEnter::new(self.batch.cursor())
146    }
147    fn len(&self) -> usize {
148        self.batch.len()
149    }
150    fn description(&self) -> &Description<TInner> {
151        &self.description
152    }
153}
154
155impl<B, TInner> BatchEnter<B, TInner>
156where
157    B: BatchReader,
158    TInner: Refines<B::Time> + Lattice,
159{
160    /// Makes a new batch wrapper
161    pub fn make_from(batch: B) -> Self {
162        let lower: Vec<_> = batch
163            .description()
164            .lower()
165            .elements()
166            .iter()
167            .map(|x| TInner::to_inner(x.clone()))
168            .collect();
169        let upper: Vec<_> = batch
170            .description()
171            .upper()
172            .elements()
173            .iter()
174            .map(|x| TInner::to_inner(x.clone()))
175            .collect();
176        let since: Vec<_> = batch
177            .description()
178            .since()
179            .elements()
180            .iter()
181            .map(|x| TInner::to_inner(x.clone()))
182            .collect();
183
184        BatchEnter {
185            batch,
186            description: Description::new(
187                Antichain::from(lower),
188                Antichain::from(upper),
189                Antichain::from(since),
190            ),
191        }
192    }
193}
194
195/// Wrapper to provide cursor to nested scope.
196pub struct CursorEnter<C, TInner> {
197    phantom: ::std::marker::PhantomData<TInner>,
198    cursor: C,
199}
200
201use crate::trace::implementations::{Layout, WithLayout};
202impl<C, TInner> WithLayout for CursorEnter<C, TInner>
203where
204    C: Cursor,
205    TInner: Refines<C::Time> + Lattice,
206{
207    type Layout = (
208        <C::Layout as Layout>::KeyContainer,
209        <C::Layout as Layout>::ValContainer,
210        Vec<TInner>,
211        <C::Layout as Layout>::DiffContainer,
212        <C::Layout as Layout>::OffsetContainer,
213    );
214}
215
216impl<C, TInner> CursorEnter<C, TInner> {
217    fn new(cursor: C) -> Self {
218        CursorEnter {
219            phantom: ::std::marker::PhantomData,
220            cursor,
221        }
222    }
223}
224
225impl<C, TInner> Cursor for CursorEnter<C, TInner>
226where
227    C: Cursor,
228    TInner: Refines<C::Time> + Lattice,
229{
230    type Storage = C::Storage;
231
232    #[inline]
233    fn key_valid(&self, storage: &Self::Storage) -> bool {
234        self.cursor.key_valid(storage)
235    }
236    #[inline]
237    fn val_valid(&self, storage: &Self::Storage) -> bool {
238        self.cursor.val_valid(storage)
239    }
240
241    #[inline]
242    fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
243        self.cursor.key(storage)
244    }
245    #[inline]
246    fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
247        self.cursor.val(storage)
248    }
249
250    #[inline]
251    fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
252        self.cursor.get_key(storage)
253    }
254    #[inline]
255    fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
256        self.cursor.get_val(storage)
257    }
258
259    #[inline]
260    fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(
261        &mut self,
262        storage: &Self::Storage,
263        mut logic: L,
264    ) {
265        self.cursor.map_times(storage, |time, diff| {
266            logic(&TInner::to_inner(C::owned_time(time)), diff)
267        })
268    }
269
270    #[inline]
271    fn step_key(&mut self, storage: &Self::Storage) {
272        self.cursor.step_key(storage)
273    }
274    #[inline]
275    fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
276        self.cursor.seek_key(storage, key)
277    }
278
279    #[inline]
280    fn step_val(&mut self, storage: &Self::Storage) {
281        self.cursor.step_val(storage)
282    }
283    #[inline]
284    fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
285        self.cursor.seek_val(storage, val)
286    }
287
288    #[inline]
289    fn rewind_keys(&mut self, storage: &Self::Storage) {
290        self.cursor.rewind_keys(storage)
291    }
292    #[inline]
293    fn rewind_vals(&mut self, storage: &Self::Storage) {
294        self.cursor.rewind_vals(storage)
295    }
296}
297
298/// Wrapper to provide cursor to nested scope.
299pub struct BatchCursorEnter<C, TInner> {
300    phantom: ::std::marker::PhantomData<TInner>,
301    cursor: C,
302}
303
304impl<C, TInner> BatchCursorEnter<C, TInner> {
305    fn new(cursor: C) -> Self {
306        BatchCursorEnter {
307            phantom: ::std::marker::PhantomData,
308            cursor,
309        }
310    }
311}
312
313impl<C, TInner> WithLayout for BatchCursorEnter<C, TInner>
314where
315    C: Cursor,
316    TInner: Refines<C::Time> + Lattice,
317{
318    type Layout = (
319        <C::Layout as Layout>::KeyContainer,
320        <C::Layout as Layout>::ValContainer,
321        Vec<TInner>,
322        <C::Layout as Layout>::DiffContainer,
323        <C::Layout as Layout>::OffsetContainer,
324    );
325}
326
327impl<TInner, C: Cursor> Cursor for BatchCursorEnter<C, TInner>
328where
329    TInner: Refines<C::Time> + Lattice,
330{
331    type Storage = BatchEnter<C::Storage, TInner>;
332
333    #[inline]
334    fn key_valid(&self, storage: &Self::Storage) -> bool {
335        self.cursor.key_valid(&storage.batch)
336    }
337    #[inline]
338    fn val_valid(&self, storage: &Self::Storage) -> bool {
339        self.cursor.val_valid(&storage.batch)
340    }
341
342    #[inline]
343    fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
344        self.cursor.key(&storage.batch)
345    }
346    #[inline]
347    fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
348        self.cursor.val(&storage.batch)
349    }
350
351    #[inline]
352    fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
353        self.cursor.get_key(&storage.batch)
354    }
355    #[inline]
356    fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
357        self.cursor.get_val(&storage.batch)
358    }
359
360    #[inline]
361    fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(
362        &mut self,
363        storage: &Self::Storage,
364        mut logic: L,
365    ) {
366        self.cursor.map_times(&storage.batch, |time, diff| {
367            logic(&TInner::to_inner(C::owned_time(time)), diff)
368        })
369    }
370
371    #[inline]
372    fn step_key(&mut self, storage: &Self::Storage) {
373        self.cursor.step_key(&storage.batch)
374    }
375    #[inline]
376    fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
377        self.cursor.seek_key(&storage.batch, key)
378    }
379
380    #[inline]
381    fn step_val(&mut self, storage: &Self::Storage) {
382        self.cursor.step_val(&storage.batch)
383    }
384    #[inline]
385    fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
386        self.cursor.seek_val(&storage.batch, val)
387    }
388
389    #[inline]
390    fn rewind_keys(&mut self, storage: &Self::Storage) {
391        self.cursor.rewind_keys(&storage.batch)
392    }
393    #[inline]
394    fn rewind_vals(&mut self, storage: &Self::Storage) {
395        self.cursor.rewind_vals(&storage.batch)
396    }
397}