differential_dataflow/trace/wrappers/
enter.rs

1//! Wrappers to provide trace access to nested scopes.
2
3// use timely::progress::nested::product::Product;
4use timely::progress::timestamp::Refines;
5use timely::progress::Timestamp;
6use timely::progress::{Antichain, frontier::AntichainRef};
7
8use crate::lattice::Lattice;
9use crate::trace::{TraceReader, BatchReader, Description};
10use crate::trace::cursor::Cursor;
11
12/// Wrapper to provide trace to nested scope.
13pub struct TraceEnter<Tr, TInner>
14where
15    Tr: TraceReader,
16{
17    trace: Tr,
18    stash1: Antichain<Tr::Time>,
19    stash2: Antichain<TInner>,
20}
21
22impl<Tr,TInner> Clone for TraceEnter<Tr, TInner>
23where
24    Tr: TraceReader+Clone,
25{
26    fn clone(&self) -> Self {
27        TraceEnter {
28            trace: self.trace.clone(),
29            stash1: Antichain::new(),
30            stash2: Antichain::new(),
31        }
32    }
33}
34
35impl<Tr, TInner> TraceReader for TraceEnter<Tr, TInner>
36where
37    Tr: TraceReader,
38    Tr::Batch: Clone,
39    Tr::Time: Timestamp,
40    Tr::Diff: 'static,
41    TInner: Refines<Tr::Time>+Lattice,
42{
43    type Key<'a> = Tr::Key<'a>;
44    type KeyOwned = Tr::KeyOwned;
45    type Val<'a> = Tr::Val<'a>;
46    type ValOwned = Tr::ValOwned;
47    type Time = TInner;
48    type Diff = Tr::Diff;
49
50    type Batch = BatchEnter<Tr::Batch, TInner>;
51    type Storage = Tr::Storage;
52    type Cursor = CursorEnter<Tr::Cursor, TInner>;
53
54    fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
55        self.trace.map_batches(|batch| {
56            f(&Self::Batch::make_from(batch.clone()));
57        })
58    }
59
60    fn set_logical_compaction(&mut self, frontier: AntichainRef<TInner>) {
61        self.stash1.clear();
62        for time in frontier.iter() {
63            self.stash1.insert(time.clone().to_outer());
64        }
65        self.trace.set_logical_compaction(self.stash1.borrow());
66    }
67    fn get_logical_compaction(&mut self) -> AntichainRef<TInner> {
68        self.stash2.clear();
69        for time in self.trace.get_logical_compaction().iter() {
70            self.stash2.insert(TInner::to_inner(time.clone()));
71        }
72        self.stash2.borrow()
73    }
74
75    fn set_physical_compaction(&mut self, frontier: AntichainRef<TInner>) {
76        self.stash1.clear();
77        for time in frontier.iter() {
78            self.stash1.insert(time.clone().to_outer());
79        }
80        self.trace.set_physical_compaction(self.stash1.borrow());
81    }
82    fn get_physical_compaction(&mut self) -> AntichainRef<TInner> {
83        self.stash2.clear();
84        for time in self.trace.get_physical_compaction().iter() {
85            self.stash2.insert(TInner::to_inner(time.clone()));
86        }
87        self.stash2.borrow()
88    }
89
90    fn cursor_through(&mut self, upper: AntichainRef<TInner>) -> Option<(Self::Cursor, Self::Storage)> {
91        self.stash1.clear();
92        for time in upper.iter() {
93            self.stash1.insert(time.clone().to_outer());
94        }
95        self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x), y))
96    }
97}
98
99impl<Tr, TInner> TraceEnter<Tr, TInner>
100where
101    Tr: TraceReader,
102    Tr::Time: Timestamp,
103    TInner: Refines<Tr::Time>+Lattice,
104{
105    /// Makes a new trace wrapper
106    pub fn make_from(trace: Tr) -> Self {
107        TraceEnter {
108            trace,
109            stash1: Antichain::new(),
110            stash2: Antichain::new(),
111        }
112    }
113}
114
115
116/// Wrapper to provide batch to nested scope.
117#[derive(Clone)]
118pub struct BatchEnter<B, TInner> {
119    batch: B,
120    description: Description<TInner>,
121}
122
123impl<B, TInner> BatchReader for BatchEnter<B, TInner>
124where
125    B: BatchReader,
126    B::Time: Timestamp,
127    TInner: Refines<B::Time>+Lattice,
128{
129    type Key<'a> = B::Key<'a>;
130    type KeyOwned = B::KeyOwned;
131    type Val<'a> = B::Val<'a>;
132    type ValOwned = B::ValOwned;
133    type Time = TInner;
134    type Diff = B::Diff;
135
136    type Cursor = BatchCursorEnter<B::Cursor, TInner>;
137
138    fn cursor(&self) -> Self::Cursor {
139        BatchCursorEnter::new(self.batch.cursor())
140    }
141    fn len(&self) -> usize { self.batch.len() }
142    fn description(&self) -> &Description<TInner> { &self.description }
143}
144
145impl<B, TInner> BatchEnter<B, TInner>
146where
147    B: BatchReader,
148    B::Time: Timestamp,
149    TInner: Refines<B::Time>+Lattice,
150{
151    /// Makes a new batch wrapper
152    pub fn make_from(batch: B) -> Self {
153        let lower: Vec<_> = batch.description().lower().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
154        let upper: Vec<_> = batch.description().upper().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
155        let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
156
157        BatchEnter {
158            batch,
159            description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since))
160        }
161    }
162}
163
164/// Wrapper to provide cursor to nested scope.
165pub struct CursorEnter<C, TInner> {
166    phantom: ::std::marker::PhantomData<TInner>,
167    cursor: C,
168}
169
170impl<C, TInner> CursorEnter<C, TInner> {
171    fn new(cursor: C) -> Self {
172        CursorEnter {
173            phantom: ::std::marker::PhantomData,
174            cursor,
175        }
176    }
177}
178
179impl<C, TInner> Cursor for CursorEnter<C, TInner>
180where
181    C: Cursor,
182    C::Time: Timestamp,
183    TInner: Refines<C::Time>+Lattice,
184{
185    type Key<'a> = C::Key<'a>;
186    type KeyOwned = C::KeyOwned;
187    type Val<'a> = C::Val<'a>;
188    type ValOwned = C::ValOwned;
189    type Time = TInner;
190    type Diff = C::Diff;
191
192    type Storage = C::Storage;
193
194    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
195    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
196
197    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
198    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
199
200    #[inline]
201    fn map_times<L: FnMut(&TInner, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
202        self.cursor.map_times(storage, |time, diff| {
203            logic(&TInner::to_inner(time.clone()), diff)
204        })
205    }
206
207    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
208    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
209
210    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
211    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
212
213    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
214    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
215}
216
217
218
219/// Wrapper to provide cursor to nested scope.
220pub struct BatchCursorEnter<C, TInner> {
221    phantom: ::std::marker::PhantomData<TInner>,
222    cursor: C,
223}
224
225impl<C, TInner> BatchCursorEnter<C, TInner> {
226    fn new(cursor: C) -> Self {
227        BatchCursorEnter {
228            phantom: ::std::marker::PhantomData,
229            cursor,
230        }
231    }
232}
233
234impl<TInner, C: Cursor> Cursor for BatchCursorEnter<C, TInner>
235where
236    C::Time: Timestamp,
237    TInner: Refines<C::Time>+Lattice,
238{
239    type Key<'a> = C::Key<'a>;
240    type KeyOwned = C::KeyOwned;
241    type Val<'a> = C::Val<'a>;
242    type ValOwned = C::ValOwned;
243    type Time = TInner;
244    type Diff = C::Diff;
245
246    type Storage = BatchEnter<C::Storage, TInner>;
247
248    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
249    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
250
251    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
252    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
253
254    #[inline]
255    fn map_times<L: FnMut(&TInner, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
256        self.cursor.map_times(&storage.batch, |time, diff| {
257            logic(&TInner::to_inner(time.clone()), diff)
258        })
259    }
260
261    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
262    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
263
264    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
265    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
266
267    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
268    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
269}