palimpsest_dataflow/trace/wrappers/
enter.rs1use timely::progress::timestamp::Refines;
5use timely::progress::{frontier::AntichainRef, Antichain};
6
7use crate::lattice::Lattice;
8use crate::trace::cursor::Cursor;
9use crate::trace::{BatchReader, Description, TraceReader};
10
11pub struct TraceEnter<Tr: TraceReader, TInner> {
13 trace: Tr,
14 stash1: Antichain<Tr::Time>,
15 stash2: Antichain<TInner>,
16}
17
18impl<Tr: TraceReader + Clone, TInner> Clone for TraceEnter<Tr, TInner> {
19 fn clone(&self) -> Self {
20 TraceEnter {
21 trace: self.trace.clone(),
22 stash1: Antichain::new(),
23 stash2: Antichain::new(),
24 }
25 }
26}
27
28impl<Tr, TInner> WithLayout for TraceEnter<Tr, TInner>
29where
30 Tr: TraceReader<Batch: Clone>,
31 TInner: Refines<Tr::Time> + Lattice,
32{
33 type Layout = (
34 <Tr::Layout as Layout>::KeyContainer,
35 <Tr::Layout as Layout>::ValContainer,
36 Vec<TInner>,
37 <Tr::Layout as Layout>::DiffContainer,
38 <Tr::Layout as Layout>::OffsetContainer,
39 );
40}
41
42impl<Tr, TInner> TraceReader for TraceEnter<Tr, TInner>
43where
44 Tr: TraceReader<Batch: Clone>,
45 TInner: Refines<Tr::Time> + Lattice,
46{
47 type Batch = BatchEnter<Tr::Batch, TInner>;
48 type Storage = Tr::Storage;
49 type Cursor = CursorEnter<Tr::Cursor, TInner>;
50
51 fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
52 self.trace.map_batches(|batch| {
53 f(&Self::Batch::make_from(batch.clone()));
54 })
55 }
56
57 fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
58 self.stash1.clear();
59 for time in frontier.iter() {
60 self.stash1.insert(time.clone().to_outer());
61 }
62 self.trace.set_logical_compaction(self.stash1.borrow());
63 }
64 fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> {
65 self.stash2.clear();
66 for time in self.trace.get_logical_compaction().iter() {
67 self.stash2.insert(TInner::to_inner(time.clone()));
68 }
69 self.stash2.borrow()
70 }
71
72 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
73 self.stash1.clear();
74 for time in frontier.iter() {
75 self.stash1.insert(time.clone().to_outer());
76 }
77 self.trace.set_physical_compaction(self.stash1.borrow());
78 }
79 fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> {
80 self.stash2.clear();
81 for time in self.trace.get_physical_compaction().iter() {
82 self.stash2.insert(TInner::to_inner(time.clone()));
83 }
84 self.stash2.borrow()
85 }
86
87 fn cursor_through(
88 &mut self,
89 upper: AntichainRef<TInner>,
90 ) -> Option<(Self::Cursor, Self::Storage)> {
91 self.stash1.clear();
92 for time in upper.iter() {
93 self.stash1.insert(time.clone().to_outer());
94 }
95 self.trace
96 .cursor_through(self.stash1.borrow())
97 .map(|(x, y)| (CursorEnter::new(x), y))
98 }
99}
100
101impl<Tr, TInner> TraceEnter<Tr, TInner>
102where
103 Tr: TraceReader,
104 TInner: Refines<Tr::Time> + Lattice,
105{
106 pub fn make_from(trace: Tr) -> Self {
108 TraceEnter {
109 trace,
110 stash1: Antichain::new(),
111 stash2: Antichain::new(),
112 }
113 }
114}
115
116#[derive(Clone)]
118pub struct BatchEnter<B, TInner> {
119 batch: B,
120 description: Description<TInner>,
121}
122
123impl<B, TInner> WithLayout for BatchEnter<B, TInner>
124where
125 B: BatchReader,
126 TInner: Refines<B::Time> + Lattice,
127{
128 type Layout = (
129 <B::Layout as Layout>::KeyContainer,
130 <B::Layout as Layout>::ValContainer,
131 Vec<TInner>,
132 <B::Layout as Layout>::DiffContainer,
133 <B::Layout as Layout>::OffsetContainer,
134 );
135}
136
137impl<B, TInner> BatchReader for BatchEnter<B, TInner>
138where
139 B: BatchReader,
140 TInner: Refines<B::Time> + Lattice,
141{
142 type Cursor = BatchCursorEnter<B::Cursor, TInner>;
143
144 fn cursor(&self) -> Self::Cursor {
145 BatchCursorEnter::new(self.batch.cursor())
146 }
147 fn len(&self) -> usize {
148 self.batch.len()
149 }
150 fn description(&self) -> &Description<TInner> {
151 &self.description
152 }
153}
154
155impl<B, TInner> BatchEnter<B, TInner>
156where
157 B: BatchReader,
158 TInner: Refines<B::Time> + Lattice,
159{
160 pub fn make_from(batch: B) -> Self {
162 let lower: Vec<_> = batch
163 .description()
164 .lower()
165 .elements()
166 .iter()
167 .map(|x| TInner::to_inner(x.clone()))
168 .collect();
169 let upper: Vec<_> = batch
170 .description()
171 .upper()
172 .elements()
173 .iter()
174 .map(|x| TInner::to_inner(x.clone()))
175 .collect();
176 let since: Vec<_> = batch
177 .description()
178 .since()
179 .elements()
180 .iter()
181 .map(|x| TInner::to_inner(x.clone()))
182 .collect();
183
184 BatchEnter {
185 batch,
186 description: Description::new(
187 Antichain::from(lower),
188 Antichain::from(upper),
189 Antichain::from(since),
190 ),
191 }
192 }
193}
194
195pub struct CursorEnter<C, TInner> {
197 phantom: ::std::marker::PhantomData<TInner>,
198 cursor: C,
199}
200
201use crate::trace::implementations::{Layout, WithLayout};
202impl<C, TInner> WithLayout for CursorEnter<C, TInner>
203where
204 C: Cursor,
205 TInner: Refines<C::Time> + Lattice,
206{
207 type Layout = (
208 <C::Layout as Layout>::KeyContainer,
209 <C::Layout as Layout>::ValContainer,
210 Vec<TInner>,
211 <C::Layout as Layout>::DiffContainer,
212 <C::Layout as Layout>::OffsetContainer,
213 );
214}
215
216impl<C, TInner> CursorEnter<C, TInner> {
217 fn new(cursor: C) -> Self {
218 CursorEnter {
219 phantom: ::std::marker::PhantomData,
220 cursor,
221 }
222 }
223}
224
225impl<C, TInner> Cursor for CursorEnter<C, TInner>
226where
227 C: Cursor,
228 TInner: Refines<C::Time> + Lattice,
229{
230 type Storage = C::Storage;
231
232 #[inline]
233 fn key_valid(&self, storage: &Self::Storage) -> bool {
234 self.cursor.key_valid(storage)
235 }
236 #[inline]
237 fn val_valid(&self, storage: &Self::Storage) -> bool {
238 self.cursor.val_valid(storage)
239 }
240
241 #[inline]
242 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
243 self.cursor.key(storage)
244 }
245 #[inline]
246 fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
247 self.cursor.val(storage)
248 }
249
250 #[inline]
251 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
252 self.cursor.get_key(storage)
253 }
254 #[inline]
255 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
256 self.cursor.get_val(storage)
257 }
258
259 #[inline]
260 fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(
261 &mut self,
262 storage: &Self::Storage,
263 mut logic: L,
264 ) {
265 self.cursor.map_times(storage, |time, diff| {
266 logic(&TInner::to_inner(C::owned_time(time)), diff)
267 })
268 }
269
270 #[inline]
271 fn step_key(&mut self, storage: &Self::Storage) {
272 self.cursor.step_key(storage)
273 }
274 #[inline]
275 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
276 self.cursor.seek_key(storage, key)
277 }
278
279 #[inline]
280 fn step_val(&mut self, storage: &Self::Storage) {
281 self.cursor.step_val(storage)
282 }
283 #[inline]
284 fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
285 self.cursor.seek_val(storage, val)
286 }
287
288 #[inline]
289 fn rewind_keys(&mut self, storage: &Self::Storage) {
290 self.cursor.rewind_keys(storage)
291 }
292 #[inline]
293 fn rewind_vals(&mut self, storage: &Self::Storage) {
294 self.cursor.rewind_vals(storage)
295 }
296}
297
298pub struct BatchCursorEnter<C, TInner> {
300 phantom: ::std::marker::PhantomData<TInner>,
301 cursor: C,
302}
303
304impl<C, TInner> BatchCursorEnter<C, TInner> {
305 fn new(cursor: C) -> Self {
306 BatchCursorEnter {
307 phantom: ::std::marker::PhantomData,
308 cursor,
309 }
310 }
311}
312
313impl<C, TInner> WithLayout for BatchCursorEnter<C, TInner>
314where
315 C: Cursor,
316 TInner: Refines<C::Time> + Lattice,
317{
318 type Layout = (
319 <C::Layout as Layout>::KeyContainer,
320 <C::Layout as Layout>::ValContainer,
321 Vec<TInner>,
322 <C::Layout as Layout>::DiffContainer,
323 <C::Layout as Layout>::OffsetContainer,
324 );
325}
326
327impl<TInner, C: Cursor> Cursor for BatchCursorEnter<C, TInner>
328where
329 TInner: Refines<C::Time> + Lattice,
330{
331 type Storage = BatchEnter<C::Storage, TInner>;
332
333 #[inline]
334 fn key_valid(&self, storage: &Self::Storage) -> bool {
335 self.cursor.key_valid(&storage.batch)
336 }
337 #[inline]
338 fn val_valid(&self, storage: &Self::Storage) -> bool {
339 self.cursor.val_valid(&storage.batch)
340 }
341
342 #[inline]
343 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
344 self.cursor.key(&storage.batch)
345 }
346 #[inline]
347 fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
348 self.cursor.val(&storage.batch)
349 }
350
351 #[inline]
352 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
353 self.cursor.get_key(&storage.batch)
354 }
355 #[inline]
356 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
357 self.cursor.get_val(&storage.batch)
358 }
359
360 #[inline]
361 fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(
362 &mut self,
363 storage: &Self::Storage,
364 mut logic: L,
365 ) {
366 self.cursor.map_times(&storage.batch, |time, diff| {
367 logic(&TInner::to_inner(C::owned_time(time)), diff)
368 })
369 }
370
371 #[inline]
372 fn step_key(&mut self, storage: &Self::Storage) {
373 self.cursor.step_key(&storage.batch)
374 }
375 #[inline]
376 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
377 self.cursor.seek_key(&storage.batch, key)
378 }
379
380 #[inline]
381 fn step_val(&mut self, storage: &Self::Storage) {
382 self.cursor.step_val(&storage.batch)
383 }
384 #[inline]
385 fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
386 self.cursor.seek_val(&storage.batch, val)
387 }
388
389 #[inline]
390 fn rewind_keys(&mut self, storage: &Self::Storage) {
391 self.cursor.rewind_keys(&storage.batch)
392 }
393 #[inline]
394 fn rewind_vals(&mut self, storage: &Self::Storage) {
395 self.cursor.rewind_vals(&storage.batch)
396 }
397}