differential_dataflow/trace/wrappers/
freeze.rs1use std::rc::Rc;
21
22use timely::dataflow::Scope;
23use timely::dataflow::operators::Map;
24use timely::progress::frontier::AntichainRef;
25
26use crate::operators::arrange::Arranged;
27use crate::lattice::Lattice;
28use crate::trace::{TraceReader, BatchReader, Description};
29use crate::trace::cursor::Cursor;
30
31pub fn freeze<G, T, F>(arranged: &Arranged<G, T>, func: F) -> Arranged<G, TraceFreeze<T, F>>
37where
38 G: Scope,
39 G::Timestamp: Lattice+Ord,
40 T: TraceReader<Time=G::Timestamp>+Clone,
41 T::Diff: 'static,
42 F: Fn(&G::Timestamp)->Option<G::Timestamp>+'static,
43{
44 let func1 = Rc::new(func);
45 let func2 = func1.clone();
46 Arranged {
47 stream: arranged.stream.map(move |bw| BatchFreeze::make_from(bw, func1.clone())),
48 trace: TraceFreeze::make_from(arranged.trace.clone(), func2),
49 }
50}
51
52pub struct TraceFreeze<Tr, F>
54where
55 Tr: TraceReader,
56 Tr::Time: Lattice+Clone+'static,
57 F: Fn(&Tr::Time)->Option<Tr::Time>,
58{
59 trace: Tr,
60 func: Rc<F>,
61}
62
63impl<Tr,F> Clone for TraceFreeze<Tr, F>
64where
65 Tr: TraceReader+Clone,
66 Tr::Time: Lattice+Clone+'static,
67 F: Fn(&Tr::Time)->Option<Tr::Time>,
68{
69 fn clone(&self) -> Self {
70 TraceFreeze {
71 trace: self.trace.clone(),
72 func: self.func.clone(),
73 }
74 }
75}
76
77impl<Tr, F> TraceReader for TraceFreeze<Tr, F>
78where
79 Tr: TraceReader,
80 Tr::Batch: Clone,
81 Tr::Time: Lattice+Clone+'static,
82 Tr::Diff: 'static,
83 F: Fn(&Tr::Time)->Option<Tr::Time>+'static,
84{
85 type Key<'a> = Tr::Key<'a>;
86 type KeyOwned = Tr::KeyOwned;
87 type Val<'a> = Tr::Val<'a>;
88 type ValOwned = Tr::ValOwned;
89 type Time = Tr::Time;
90 type Diff = Tr::Diff;
91
92 type Batch = BatchFreeze<Tr::Batch, F>;
93 type Storage = Tr::Storage;
94 type Cursor = CursorFreeze<Tr::Cursor, F>;
95
96 fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
97 let func = &self.func;
98 self.trace.map_batches(|batch| {
99 f(&Self::Batch::make_from(batch.clone(), func.clone()));
100 })
101 }
102
103 fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
104 fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
105
106 fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
107 fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
108
109 fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
110 let func = &self.func;
111 self.trace.cursor_through(upper)
112 .map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage))
113 }
114}
115
116impl<Tr, F> TraceFreeze<Tr, F>
117where
118 Tr: TraceReader,
119 Tr::Batch: Clone,
120 Tr::Time: Lattice+Clone+'static,
121 Tr::Diff: 'static,
122 F: Fn(&Tr::Time)->Option<Tr::Time>,
123{
124 pub fn make_from(trace: Tr, func: Rc<F>) -> Self {
126 Self { trace, func }
127 }
128}
129
130
131pub struct BatchFreeze<B, F> {
133 batch: B,
134 func: Rc<F>,
135}
136
137impl<B: Clone, F> Clone for BatchFreeze<B, F> {
138 fn clone(&self) -> Self {
139 BatchFreeze {
140 batch: self.batch.clone(),
141 func: self.func.clone(),
142 }
143 }
144}
145
146impl<B, F> BatchReader for BatchFreeze<B, F>
147where
148 B: BatchReader,
149 B::Time: Clone,
150 F: Fn(&B::Time)->Option<B::Time>,
151{
152 type Key<'a> = B::Key<'a>;
153 type KeyOwned = B::KeyOwned;
154 type Val<'a> = B::Val<'a>;
155 type ValOwned = B::ValOwned;
156 type Time = B::Time;
157 type Diff = B::Diff;
158
159 type Cursor = BatchCursorFreeze<B::Cursor, F>;
160
161 fn cursor(&self) -> Self::Cursor {
162 BatchCursorFreeze::new(self.batch.cursor(), self.func.clone())
163 }
164 fn len(&self) -> usize { self.batch.len() }
165 fn description(&self) -> &Description<B::Time> { self.batch.description() }
166}
167
168impl<B, F> BatchFreeze<B, F>
169where
170 B: BatchReader,
171 B::Time: Clone,
172 F: Fn(&B::Time)->Option<B::Time>
173{
174 pub fn make_from(batch: B, func: Rc<F>) -> Self {
176 Self { batch, func }
177 }
178}
179
180pub struct CursorFreeze<C, F> {
182 cursor: C,
183 func: Rc<F>,
184}
185
186impl<C, F> CursorFreeze<C, F> {
187 fn new(cursor: C, func: Rc<F>) -> Self {
188 Self { cursor, func }
189 }
190}
191
192impl<C, F> Cursor for CursorFreeze<C, F>
193where
194 C: Cursor,
195 C::Time: Sized,
196 F: Fn(&C::Time)->Option<C::Time>,
197{
198 type Key<'a> = C::Key<'a>;
199 type KeyOwned = C::KeyOwned;
200 type Val<'a> = C::Val<'a>;
201 type ValOwned = C::ValOwned;
202 type Time = C::Time;
203 type Diff = C::Diff;
204
205 type Storage = C::Storage;
206
207 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
208 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
209
210 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
211 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
212
213 #[inline] fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
214 let func = &self.func;
215 self.cursor.map_times(storage, |time, diff| {
216 if let Some(time) = func(time) {
217 logic(&time, diff);
218 }
219 })
220 }
221
222 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
223 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
224
225 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
226 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
227
228 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
229 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
230}
231
232
233pub struct BatchCursorFreeze<C, F> {
235 cursor: C,
236 func: Rc<F>,
237}
238
239impl<C, F> BatchCursorFreeze<C, F> {
240 fn new(cursor: C, func: Rc<F>) -> Self {
241 Self { cursor, func }
242 }
243}
244
245impl<C: Cursor, F> Cursor for BatchCursorFreeze<C, F>
247where
248 F: Fn(&C::Time)->Option<C::Time>,
249{
250 type Key<'a> = C::Key<'a>;
251 type KeyOwned = C::KeyOwned;
252 type Val<'a> = C::Val<'a>;
253 type ValOwned = C::ValOwned;
254 type Time = C::Time;
255 type Diff = C::Diff;
256
257 type Storage = BatchFreeze<C::Storage, F>;
258
259 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
260 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
261
262 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
263 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
264
265 #[inline] fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
266 let func = &self.func;
267 self.cursor.map_times(&storage.batch, |time, diff| {
268 if let Some(time) = func(time) {
269 logic(&time, diff);
270 }
271 })
272 }
273
274 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
275 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
276
277 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
278 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
279
280 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
281 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
282}