palimpsest_dataflow/trace/wrappers/
frontier.rs1use timely::progress::{frontier::AntichainRef, Antichain};
10
11use crate::lattice::Lattice;
12use crate::trace::cursor::Cursor;
13use crate::trace::{BatchReader, Description, TraceReader};
14
15pub struct TraceFrontier<Tr: TraceReader> {
17 trace: Tr,
18 since: Antichain<Tr::Time>,
20 until: Antichain<Tr::Time>,
22}
23
24impl<Tr: TraceReader + Clone> Clone for TraceFrontier<Tr> {
25 fn clone(&self) -> Self {
26 TraceFrontier {
27 trace: self.trace.clone(),
28 since: self.since.clone(),
29 until: self.until.clone(),
30 }
31 }
32}
33
34impl<Tr: TraceReader> WithLayout for TraceFrontier<Tr> {
35 type Layout = (
36 <Tr::Layout as Layout>::KeyContainer,
37 <Tr::Layout as Layout>::ValContainer,
38 Vec<Tr::Time>,
39 <Tr::Layout as Layout>::DiffContainer,
40 <Tr::Layout as Layout>::OffsetContainer,
41 );
42}
43
44impl<Tr: TraceReader> TraceReader for TraceFrontier<Tr> {
45 type Batch = BatchFrontier<Tr::Batch>;
46 type Storage = Tr::Storage;
47 type Cursor = CursorFrontier<Tr::Cursor, Tr::Time>;
48
49 fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
50 let since = self.since.borrow();
51 let until = self.until.borrow();
52 self.trace
53 .map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until)))
54 }
55
56 fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
57 self.trace.set_logical_compaction(frontier)
58 }
59 fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
60 self.trace.get_logical_compaction()
61 }
62
63 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
64 self.trace.set_physical_compaction(frontier)
65 }
66 fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
67 self.trace.get_physical_compaction()
68 }
69
70 fn cursor_through(
71 &mut self,
72 upper: AntichainRef<'_, Tr::Time>,
73 ) -> Option<(Self::Cursor, Self::Storage)> {
74 let since = self.since.borrow();
75 let until = self.until.borrow();
76 self.trace
77 .cursor_through(upper)
78 .map(|(x, y)| (CursorFrontier::new(x, since, until), y))
79 }
80}
81
82impl<Tr: TraceReader> TraceFrontier<Tr> {
83 pub fn make_from(
85 trace: Tr,
86 since: AntichainRef<'_, Tr::Time>,
87 until: AntichainRef<'_, Tr::Time>,
88 ) -> Self {
89 TraceFrontier {
90 trace,
91 since: since.to_owned(),
92 until: until.to_owned(),
93 }
94 }
95}
96
97#[derive(Clone)]
99pub struct BatchFrontier<B: BatchReader> {
100 batch: B,
101 since: Antichain<B::Time>,
102 until: Antichain<B::Time>,
103}
104
105impl<B: BatchReader> WithLayout for BatchFrontier<B> {
106 type Layout = (
107 <B::Layout as Layout>::KeyContainer,
108 <B::Layout as Layout>::ValContainer,
109 Vec<B::Time>,
110 <B::Layout as Layout>::DiffContainer,
111 <B::Layout as Layout>::OffsetContainer,
112 );
113}
114
115impl<B: BatchReader> BatchReader for BatchFrontier<B> {
116 type Cursor = BatchCursorFrontier<B::Cursor>;
117
118 fn cursor(&self) -> Self::Cursor {
119 BatchCursorFrontier::new(
120 self.batch.cursor(),
121 self.since.borrow(),
122 self.until.borrow(),
123 )
124 }
125 fn len(&self) -> usize {
126 self.batch.len()
127 }
128 fn description(&self) -> &Description<B::Time> {
129 self.batch.description()
130 }
131}
132
133impl<B: BatchReader> BatchFrontier<B> {
134 pub fn make_from(batch: B, since: AntichainRef<B::Time>, until: AntichainRef<B::Time>) -> Self {
136 BatchFrontier {
137 batch,
138 since: since.to_owned(),
139 until: until.to_owned(),
140 }
141 }
142}
143
144pub struct CursorFrontier<C, T> {
146 cursor: C,
147 since: Antichain<T>,
148 until: Antichain<T>,
149}
150
151use crate::trace::implementations::{Layout, WithLayout};
152impl<C: Cursor> WithLayout for CursorFrontier<C, C::Time> {
153 type Layout = (
154 <C::Layout as Layout>::KeyContainer,
155 <C::Layout as Layout>::ValContainer,
156 Vec<C::Time>,
157 <C::Layout as Layout>::DiffContainer,
158 <C::Layout as Layout>::OffsetContainer,
159 );
160}
161
162impl<C, T: Clone> CursorFrontier<C, T> {
163 fn new(cursor: C, since: AntichainRef<T>, until: AntichainRef<T>) -> Self {
164 CursorFrontier {
165 cursor,
166 since: since.to_owned(),
167 until: until.to_owned(),
168 }
169 }
170}
171
172impl<C: Cursor> Cursor for CursorFrontier<C, C::Time> {
173 type Storage = C::Storage;
174
175 #[inline]
176 fn key_valid(&self, storage: &Self::Storage) -> bool {
177 self.cursor.key_valid(storage)
178 }
179 #[inline]
180 fn val_valid(&self, storage: &Self::Storage) -> bool {
181 self.cursor.val_valid(storage)
182 }
183
184 #[inline]
185 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
186 self.cursor.key(storage)
187 }
188 #[inline]
189 fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
190 self.cursor.val(storage)
191 }
192
193 #[inline]
194 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
195 self.cursor.get_key(storage)
196 }
197 #[inline]
198 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
199 self.cursor.get_val(storage)
200 }
201
202 #[inline]
203 fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
204 &mut self,
205 storage: &Self::Storage,
206 mut logic: L,
207 ) {
208 let since = self.since.borrow();
209 let until = self.until.borrow();
210 let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
211 self.cursor.map_times(storage, |time, diff| {
212 C::clone_time_onto(time, &mut temp);
213 temp.advance_by(since);
214 if !until.less_equal(&temp) {
215 logic(&temp, diff);
216 }
217 })
218 }
219
220 #[inline]
221 fn step_key(&mut self, storage: &Self::Storage) {
222 self.cursor.step_key(storage)
223 }
224 #[inline]
225 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
226 self.cursor.seek_key(storage, key)
227 }
228
229 #[inline]
230 fn step_val(&mut self, storage: &Self::Storage) {
231 self.cursor.step_val(storage)
232 }
233 #[inline]
234 fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
235 self.cursor.seek_val(storage, val)
236 }
237
238 #[inline]
239 fn rewind_keys(&mut self, storage: &Self::Storage) {
240 self.cursor.rewind_keys(storage)
241 }
242 #[inline]
243 fn rewind_vals(&mut self, storage: &Self::Storage) {
244 self.cursor.rewind_vals(storage)
245 }
246}
247
248pub struct BatchCursorFrontier<C: Cursor> {
250 cursor: C,
251 since: Antichain<C::Time>,
252 until: Antichain<C::Time>,
253}
254
255impl<C: Cursor> WithLayout for BatchCursorFrontier<C> {
256 type Layout = (
257 <C::Layout as Layout>::KeyContainer,
258 <C::Layout as Layout>::ValContainer,
259 Vec<C::Time>,
260 <C::Layout as Layout>::DiffContainer,
261 <C::Layout as Layout>::OffsetContainer,
262 );
263}
264
265impl<C: Cursor> BatchCursorFrontier<C> {
266 fn new(cursor: C, since: AntichainRef<C::Time>, until: AntichainRef<C::Time>) -> Self {
267 BatchCursorFrontier {
268 cursor,
269 since: since.to_owned(),
270 until: until.to_owned(),
271 }
272 }
273}
274
275impl<C: Cursor<Storage: BatchReader>> Cursor for BatchCursorFrontier<C> {
276 type Storage = BatchFrontier<C::Storage>;
277
278 #[inline]
279 fn key_valid(&self, storage: &Self::Storage) -> bool {
280 self.cursor.key_valid(&storage.batch)
281 }
282 #[inline]
283 fn val_valid(&self, storage: &Self::Storage) -> bool {
284 self.cursor.val_valid(&storage.batch)
285 }
286
287 #[inline]
288 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
289 self.cursor.key(&storage.batch)
290 }
291 #[inline]
292 fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
293 self.cursor.val(&storage.batch)
294 }
295
296 #[inline]
297 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
298 self.cursor.get_key(&storage.batch)
299 }
300 #[inline]
301 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
302 self.cursor.get_val(&storage.batch)
303 }
304
305 #[inline]
306 fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
307 &mut self,
308 storage: &Self::Storage,
309 mut logic: L,
310 ) {
311 let since = self.since.borrow();
312 let until = self.until.borrow();
313 let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
314 self.cursor.map_times(&storage.batch, |time, diff| {
315 C::clone_time_onto(time, &mut temp);
316 temp.advance_by(since);
317 if !until.less_equal(&temp) {
318 logic(&temp, diff);
319 }
320 })
321 }
322
323 #[inline]
324 fn step_key(&mut self, storage: &Self::Storage) {
325 self.cursor.step_key(&storage.batch)
326 }
327 #[inline]
328 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
329 self.cursor.seek_key(&storage.batch, key)
330 }
331
332 #[inline]
333 fn step_val(&mut self, storage: &Self::Storage) {
334 self.cursor.step_val(&storage.batch)
335 }
336 #[inline]
337 fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
338 self.cursor.seek_val(&storage.batch, val)
339 }
340
341 #[inline]
342 fn rewind_keys(&mut self, storage: &Self::Storage) {
343 self.cursor.rewind_keys(&storage.batch)
344 }
345 #[inline]
346 fn rewind_vals(&mut self, storage: &Self::Storage) {
347 self.cursor.rewind_vals(&storage.batch)
348 }
349}