1use timely::progress::timestamp::Refines;
4use timely::progress::Timestamp;
5use timely::progress::{Antichain, frontier::AntichainRef};
6
7use crate::lattice::Lattice;
8use crate::trace::{TraceReader, BatchReader, Description};
9use crate::trace::cursor::Cursor;
10
11pub struct TraceEnter<Tr, TInner, F, G>
20where
21 Tr: TraceReader,
22{
23 trace: Tr,
24 stash1: Antichain<Tr::Time>,
25 stash2: Antichain<TInner>,
26 logic: F,
27 prior: G,
28}
29
30impl<Tr,TInner,F,G> Clone for TraceEnter<Tr, TInner, F, G>
31where
32 Tr: TraceReader+Clone,
33 F: Clone,
34 G: Clone,
35{
36 fn clone(&self) -> Self {
37 TraceEnter {
38 trace: self.trace.clone(),
39 stash1: Antichain::new(),
40 stash2: Antichain::new(),
41 logic: self.logic.clone(),
42 prior: self.prior.clone(),
43 }
44 }
45}
46
47impl<Tr, TInner, F, G> TraceReader for TraceEnter<Tr, TInner, F, G>
48where
49 Tr: TraceReader,
50 Tr::Batch: Clone,
51 Tr::Time: Timestamp,
52 TInner: Refines<Tr::Time>+Lattice,
53 Tr::Diff: 'static,
54 F: 'static,
55 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &Tr::Time)->TInner+Clone,
56 G: FnMut(&TInner)->Tr::Time+Clone+'static,
57{
58 type Key<'a> = Tr::Key<'a>;
59 type KeyOwned = Tr::KeyOwned;
60 type Val<'a> = Tr::Val<'a>;
61 type ValOwned = Tr::ValOwned;
62 type Time = TInner;
63 type Diff = Tr::Diff;
64
65 type Batch = BatchEnter<Tr::Batch, TInner,F>;
66 type Storage = Tr::Storage;
67 type Cursor = CursorEnter<Tr::Cursor, TInner,F>;
68
69 fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
70 let logic = self.logic.clone();
71 self.trace.map_batches(|batch| {
72 f(&Self::Batch::make_from(batch.clone(), logic.clone()));
73 })
74 }
75
76 fn set_logical_compaction(&mut self, frontier: AntichainRef<TInner>) {
77 self.stash1.clear();
78 for time in frontier.iter() {
79 self.stash1.insert((self.prior)(time));
80 }
81 self.trace.set_logical_compaction(self.stash1.borrow());
82 }
83 fn get_logical_compaction(&mut self) -> AntichainRef<TInner> {
84 self.stash2.clear();
85 for time in self.trace.get_logical_compaction().iter() {
86 self.stash2.insert(TInner::to_inner(time.clone()));
87 }
88 self.stash2.borrow()
89 }
90
91 fn set_physical_compaction(&mut self, frontier: AntichainRef<TInner>) {
92 self.stash1.clear();
93 for time in frontier.iter() {
94 self.stash1.insert((self.prior)(time));
95 }
96 self.trace.set_physical_compaction(self.stash1.borrow());
97 }
98 fn get_physical_compaction(&mut self) -> AntichainRef<TInner> {
99 self.stash2.clear();
100 for time in self.trace.get_physical_compaction().iter() {
101 self.stash2.insert(TInner::to_inner(time.clone()));
102 }
103 self.stash2.borrow()
104 }
105
106 fn cursor_through(&mut self, upper: AntichainRef<TInner>) -> Option<(Self::Cursor, Self::Storage)> {
107 self.stash1.clear();
108 for time in upper.iter() {
109 self.stash1.insert(time.clone().to_outer());
110 }
111 self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x, self.logic.clone()), y))
112 }
113}
114
115impl<Tr, TInner, F, G> TraceEnter<Tr, TInner, F, G>
116where
117 Tr: TraceReader,
118 Tr::Time: Timestamp,
119 TInner: Refines<Tr::Time>+Lattice,
120{
121 pub fn make_from(trace: Tr, logic: F, prior: G) -> Self {
123 TraceEnter {
124 trace,
125 stash1: Antichain::new(),
126 stash2: Antichain::new(),
127 logic,
128 prior,
129 }
130 }
131}
132
133
134#[derive(Clone)]
136pub struct BatchEnter<B, TInner, F> {
137 batch: B,
138 description: Description<TInner>,
139 logic: F,
140}
141
142impl<B, TInner, F> BatchReader for BatchEnter<B, TInner, F>
143where
144 B: BatchReader,
145 B::Time: Timestamp,
146 TInner: Refines<B::Time>+Lattice,
147 F: FnMut(B::Key<'_>, <B::Cursor as Cursor>::Val<'_>, &B::Time)->TInner+Clone,
148{
149 type Key<'a> = B::Key<'a>;
150 type KeyOwned = B::KeyOwned;
151 type Val<'a> = B::Val<'a>;
152 type ValOwned = B::ValOwned;
153 type Time = TInner;
154 type Diff = B::Diff;
155
156 type Cursor = BatchCursorEnter<B::Cursor, TInner, F>;
157
158 fn cursor(&self) -> Self::Cursor {
159 BatchCursorEnter::new(self.batch.cursor(), self.logic.clone())
160 }
161 fn len(&self) -> usize { self.batch.len() }
162 fn description(&self) -> &Description<TInner> { &self.description }
163}
164
165impl<B, TInner, F> BatchEnter<B, TInner, F>
166where
167 B: BatchReader,
168 B::Time: Timestamp,
169 TInner: Refines<B::Time>+Lattice,
170{
171 pub fn make_from(batch: B, logic: F) -> Self {
173 let lower: Vec<_> = batch.description().lower().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
174 let upper: Vec<_> = batch.description().upper().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
175 let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
176
177 BatchEnter {
178 batch,
179 description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since)),
180 logic,
181 }
182 }
183}
184
185pub struct CursorEnter<C, TInner, F> {
187 phantom: ::std::marker::PhantomData<TInner>,
188 cursor: C,
189 logic: F,
190}
191
192impl<C, TInner, F> CursorEnter<C, TInner, F> {
193 fn new(cursor: C, logic: F) -> Self {
194 CursorEnter {
195 phantom: ::std::marker::PhantomData,
196 cursor,
197 logic,
198 }
199 }
200}
201
202impl<C, TInner, F> Cursor for CursorEnter<C, TInner, F>
203where
204 C: Cursor,
205 C::Time: Timestamp,
206 TInner: Refines<C::Time>+Lattice,
207 F: FnMut(C::Key<'_>, C::Val<'_>, &C::Time)->TInner,
208{
209 type Key<'a> = C::Key<'a>;
210 type KeyOwned = C::KeyOwned;
211 type Val<'a> = C::Val<'a>;
212 type ValOwned = C::ValOwned;
213 type Time = TInner;
214 type Diff = C::Diff;
215
216 type Storage = C::Storage;
217
218 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
219 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
220
221 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
222 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
223
224 #[inline]
225 fn map_times<L: FnMut(&TInner, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
226 let key = self.key(storage);
227 let val = self.val(storage);
228 let logic2 = &mut self.logic;
229 self.cursor.map_times(storage, |time, diff| {
230 logic(&logic2(key, val, time), diff)
231 })
232 }
233
234 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
235 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
236
237 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
238 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
239
240 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
241 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
242}
243
244
245
246pub struct BatchCursorEnter<C, TInner, F> {
248 phantom: ::std::marker::PhantomData<TInner>,
249 cursor: C,
250 logic: F,
251}
252
253impl<C, TInner, F> BatchCursorEnter<C, TInner, F> {
254 fn new(cursor: C, logic: F) -> Self {
255 BatchCursorEnter {
256 phantom: ::std::marker::PhantomData,
257 cursor,
258 logic,
259 }
260 }
261}
262
263impl<TInner, C: Cursor, F> Cursor for BatchCursorEnter<C, TInner, F>
264where
265 C::Time: Timestamp,
266 TInner: Refines<C::Time>+Lattice,
267 F: FnMut(C::Key<'_>, C::Val<'_>, &C::Time)->TInner,
268{
269 type Key<'a> = C::Key<'a>;
270 type KeyOwned = C::KeyOwned;
271 type Val<'a> = C::Val<'a>;
272 type ValOwned = C::ValOwned;
273 type Time = TInner;
274 type Diff = C::Diff;
275
276 type Storage = BatchEnter<C::Storage, TInner, F>;
277
278 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
279 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
280
281 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
282 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
283
284 #[inline]
285 fn map_times<L: FnMut(&TInner, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
286 let key = self.key(storage);
287 let val = self.val(storage);
288 let logic2 = &mut self.logic;
289 self.cursor.map_times(&storage.batch, |time, diff| {
290 logic(&logic2(key, val, time), diff)
291 })
292 }
293
294 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
295 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
296
297 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
298 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
299
300 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
301 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
302}