1use timely::progress::timestamp::Refines;
4use timely::progress::{frontier::AntichainRef, Antichain};
5
6use crate::lattice::Lattice;
7use crate::trace::cursor::Cursor;
8use crate::trace::{BatchReader, Description, TraceReader};
9
10pub struct TraceEnter<Tr: TraceReader, TInner, F, G> {
19 trace: Tr,
20 stash1: Antichain<Tr::Time>,
21 stash2: Antichain<TInner>,
22 logic: F,
23 prior: G,
24}
25
26impl<Tr, TInner, F, G> Clone for TraceEnter<Tr, TInner, F, G>
27where
28 Tr: TraceReader + Clone,
29 F: Clone,
30 G: Clone,
31{
32 fn clone(&self) -> Self {
33 TraceEnter {
34 trace: self.trace.clone(),
35 stash1: Antichain::new(),
36 stash2: Antichain::new(),
37 logic: self.logic.clone(),
38 prior: self.prior.clone(),
39 }
40 }
41}
42
43impl<Tr, TInner, F, G> WithLayout for TraceEnter<Tr, TInner, F, G>
44where
45 Tr: TraceReader<Batch: Clone>,
46 TInner: Refines<Tr::Time> + Lattice,
47 F: Clone,
48 G: Clone,
49{
50 type Layout = (
51 <Tr::Layout as Layout>::KeyContainer,
52 <Tr::Layout as Layout>::ValContainer,
53 Vec<TInner>,
54 <Tr::Layout as Layout>::DiffContainer,
55 <Tr::Layout as Layout>::OffsetContainer,
56 );
57}
58
59impl<Tr, TInner, F, G> TraceReader for TraceEnter<Tr, TInner, F, G>
60where
61 Tr: TraceReader<Batch: Clone>,
62 TInner: Refines<Tr::Time> + Lattice,
63 F: 'static,
64 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>) -> TInner + Clone,
65 G: FnMut(&TInner) -> Tr::Time + Clone + 'static,
66{
67 type Batch = BatchEnter<Tr::Batch, TInner, F>;
68 type Storage = Tr::Storage;
69 type Cursor = CursorEnter<Tr::Cursor, TInner, F>;
70
71 fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
72 let logic = self.logic.clone();
73 self.trace.map_batches(|batch| {
74 f(&Self::Batch::make_from(batch.clone(), logic.clone()));
75 })
76 }
77
78 fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
79 self.stash1.clear();
80 for time in frontier.iter() {
81 self.stash1.insert((self.prior)(time));
82 }
83 self.trace.set_logical_compaction(self.stash1.borrow());
84 }
85 fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> {
86 self.stash2.clear();
87 for time in self.trace.get_logical_compaction().iter() {
88 self.stash2.insert(TInner::to_inner(time.clone()));
89 }
90 self.stash2.borrow()
91 }
92
93 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
94 self.stash1.clear();
95 for time in frontier.iter() {
96 self.stash1.insert((self.prior)(time));
97 }
98 self.trace.set_physical_compaction(self.stash1.borrow());
99 }
100 fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> {
101 self.stash2.clear();
102 for time in self.trace.get_physical_compaction().iter() {
103 self.stash2.insert(TInner::to_inner(time.clone()));
104 }
105 self.stash2.borrow()
106 }
107
108 fn cursor_through(
109 &mut self,
110 upper: AntichainRef<TInner>,
111 ) -> Option<(Self::Cursor, Self::Storage)> {
112 self.stash1.clear();
113 for time in upper.iter() {
114 self.stash1.insert(time.clone().to_outer());
115 }
116 self.trace
117 .cursor_through(self.stash1.borrow())
118 .map(|(x, y)| (CursorEnter::new(x, self.logic.clone()), y))
119 }
120}
121
122impl<Tr, TInner, F, G> TraceEnter<Tr, TInner, F, G>
123where
124 Tr: TraceReader,
125 TInner: Refines<Tr::Time> + Lattice,
126{
127 pub fn make_from(trace: Tr, logic: F, prior: G) -> Self {
129 TraceEnter {
130 trace,
131 stash1: Antichain::new(),
132 stash2: Antichain::new(),
133 logic,
134 prior,
135 }
136 }
137}
138
139#[derive(Clone)]
141pub struct BatchEnter<B, TInner, F> {
142 batch: B,
143 description: Description<TInner>,
144 logic: F,
145}
146
147impl<B, TInner, F> WithLayout for BatchEnter<B, TInner, F>
148where
149 B: BatchReader,
150 TInner: Refines<B::Time> + Lattice,
151{
152 type Layout = (
153 <B::Layout as Layout>::KeyContainer,
154 <B::Layout as Layout>::ValContainer,
155 Vec<TInner>,
156 <B::Layout as Layout>::DiffContainer,
157 <B::Layout as Layout>::OffsetContainer,
158 );
159}
160
161use crate::trace::implementations::LayoutExt;
162impl<B, TInner, F> BatchReader for BatchEnter<B, TInner, F>
163where
164 B: BatchReader,
165 TInner: Refines<B::Time> + Lattice,
166 F: FnMut(B::Key<'_>, <B::Cursor as LayoutExt>::Val<'_>, B::TimeGat<'_>) -> TInner + Clone,
167{
168 type Cursor = BatchCursorEnter<B::Cursor, TInner, F>;
169
170 fn cursor(&self) -> Self::Cursor {
171 BatchCursorEnter::new(self.batch.cursor(), self.logic.clone())
172 }
173 fn len(&self) -> usize {
174 self.batch.len()
175 }
176 fn description(&self) -> &Description<TInner> {
177 &self.description
178 }
179}
180
181impl<B, TInner, F> BatchEnter<B, TInner, F>
182where
183 B: BatchReader,
184 TInner: Refines<B::Time> + Lattice,
185{
186 pub fn make_from(batch: B, logic: F) -> Self {
188 let lower: Vec<_> = batch
189 .description()
190 .lower()
191 .elements()
192 .iter()
193 .map(|x| TInner::to_inner(x.clone()))
194 .collect();
195 let upper: Vec<_> = batch
196 .description()
197 .upper()
198 .elements()
199 .iter()
200 .map(|x| TInner::to_inner(x.clone()))
201 .collect();
202 let since: Vec<_> = batch
203 .description()
204 .since()
205 .elements()
206 .iter()
207 .map(|x| TInner::to_inner(x.clone()))
208 .collect();
209
210 BatchEnter {
211 batch,
212 description: Description::new(
213 Antichain::from(lower),
214 Antichain::from(upper),
215 Antichain::from(since),
216 ),
217 logic,
218 }
219 }
220}
221
222pub struct CursorEnter<C, TInner, F> {
224 phantom: ::std::marker::PhantomData<TInner>,
225 cursor: C,
226 logic: F,
227}
228
229use crate::trace::implementations::{Layout, WithLayout};
230impl<C, TInner, F> WithLayout for CursorEnter<C, TInner, F>
231where
232 C: Cursor,
233 TInner: Refines<C::Time> + Lattice,
234{
235 type Layout = (
236 <C::Layout as Layout>::KeyContainer,
237 <C::Layout as Layout>::ValContainer,
238 Vec<TInner>,
239 <C::Layout as Layout>::DiffContainer,
240 <C::Layout as Layout>::OffsetContainer,
241 );
242}
243
244impl<C, TInner, F> CursorEnter<C, TInner, F> {
245 fn new(cursor: C, logic: F) -> Self {
246 CursorEnter {
247 phantom: ::std::marker::PhantomData,
248 cursor,
249 logic,
250 }
251 }
252}
253
254impl<C, TInner, F> Cursor for CursorEnter<C, TInner, F>
255where
256 C: Cursor,
257 TInner: Refines<C::Time> + Lattice,
258 F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>) -> TInner,
259{
260 type Storage = C::Storage;
261
262 #[inline]
263 fn key_valid(&self, storage: &Self::Storage) -> bool {
264 self.cursor.key_valid(storage)
265 }
266 #[inline]
267 fn val_valid(&self, storage: &Self::Storage) -> bool {
268 self.cursor.val_valid(storage)
269 }
270
271 #[inline]
272 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
273 self.cursor.key(storage)
274 }
275 #[inline]
276 fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
277 self.cursor.val(storage)
278 }
279
280 #[inline]
281 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
282 self.cursor.get_key(storage)
283 }
284 #[inline]
285 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
286 self.cursor.get_val(storage)
287 }
288
289 #[inline]
290 fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(
291 &mut self,
292 storage: &Self::Storage,
293 mut logic: L,
294 ) {
295 let key = self.key(storage);
296 let val = self.val(storage);
297 let logic2 = &mut self.logic;
298 self.cursor
299 .map_times(storage, |time, diff| logic(&logic2(key, val, time), diff))
300 }
301
302 #[inline]
303 fn step_key(&mut self, storage: &Self::Storage) {
304 self.cursor.step_key(storage)
305 }
306 #[inline]
307 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
308 self.cursor.seek_key(storage, key)
309 }
310
311 #[inline]
312 fn step_val(&mut self, storage: &Self::Storage) {
313 self.cursor.step_val(storage)
314 }
315 #[inline]
316 fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
317 self.cursor.seek_val(storage, val)
318 }
319
320 #[inline]
321 fn rewind_keys(&mut self, storage: &Self::Storage) {
322 self.cursor.rewind_keys(storage)
323 }
324 #[inline]
325 fn rewind_vals(&mut self, storage: &Self::Storage) {
326 self.cursor.rewind_vals(storage)
327 }
328}
329
330pub struct BatchCursorEnter<C, TInner, F> {
332 phantom: ::std::marker::PhantomData<TInner>,
333 cursor: C,
334 logic: F,
335}
336
337impl<C, TInner, F> WithLayout for BatchCursorEnter<C, TInner, F>
338where
339 C: Cursor,
340 TInner: Refines<C::Time> + Lattice,
341{
342 type Layout = (
343 <C::Layout as Layout>::KeyContainer,
344 <C::Layout as Layout>::ValContainer,
345 Vec<TInner>,
346 <C::Layout as Layout>::DiffContainer,
347 <C::Layout as Layout>::OffsetContainer,
348 );
349}
350
351impl<C, TInner, F> BatchCursorEnter<C, TInner, F> {
352 fn new(cursor: C, logic: F) -> Self {
353 BatchCursorEnter {
354 phantom: ::std::marker::PhantomData,
355 cursor,
356 logic,
357 }
358 }
359}
360
361impl<TInner, C: Cursor, F> Cursor for BatchCursorEnter<C, TInner, F>
362where
363 TInner: Refines<C::Time> + Lattice,
364 F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>) -> TInner,
365{
366 type Storage = BatchEnter<C::Storage, TInner, F>;
367
368 #[inline]
369 fn key_valid(&self, storage: &Self::Storage) -> bool {
370 self.cursor.key_valid(&storage.batch)
371 }
372 #[inline]
373 fn val_valid(&self, storage: &Self::Storage) -> bool {
374 self.cursor.val_valid(&storage.batch)
375 }
376
377 #[inline]
378 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
379 self.cursor.key(&storage.batch)
380 }
381 #[inline]
382 fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
383 self.cursor.val(&storage.batch)
384 }
385
386 #[inline]
387 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
388 self.cursor.get_key(&storage.batch)
389 }
390 #[inline]
391 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
392 self.cursor.get_val(&storage.batch)
393 }
394
395 #[inline]
396 fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(
397 &mut self,
398 storage: &Self::Storage,
399 mut logic: L,
400 ) {
401 let key = self.key(storage);
402 let val = self.val(storage);
403 let logic2 = &mut self.logic;
404 self.cursor.map_times(&storage.batch, |time, diff| {
405 logic(&logic2(key, val, time), diff)
406 })
407 }
408
409 #[inline]
410 fn step_key(&mut self, storage: &Self::Storage) {
411 self.cursor.step_key(&storage.batch)
412 }
413 #[inline]
414 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
415 self.cursor.seek_key(&storage.batch, key)
416 }
417
418 #[inline]
419 fn step_val(&mut self, storage: &Self::Storage) {
420 self.cursor.step_val(&storage.batch)
421 }
422 #[inline]
423 fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
424 self.cursor.seek_val(&storage.batch, val)
425 }
426
427 #[inline]
428 fn rewind_keys(&mut self, storage: &Self::Storage) {
429 self.cursor.rewind_keys(&storage.batch)
430 }
431 #[inline]
432 fn rewind_vals(&mut self, storage: &Self::Storage) {
433 self.cursor.rewind_vals(&storage.batch)
434 }
435}