differential_dataflow/trace/wrappers/
freeze.rs1use std::rc::Rc;
21
22use timely::dataflow::Scope;
23use timely::dataflow::operators::Map;
24use timely::progress::frontier::AntichainRef;
25
26use crate::operators::arrange::Arranged;
27use crate::trace::{TraceReader, BatchReader, Description};
28use crate::trace::cursor::Cursor;
29use crate::IntoOwned;
30
31pub fn freeze<G, T, F>(arranged: &Arranged<G, T>, func: F) -> Arranged<G, TraceFreeze<T, F>>
37where
38 G: Scope<Timestamp=T::Time>,
39 T: TraceReader+Clone,
40 F: Fn(T::TimeGat<'_>)->Option<T::Time>+'static,
41{
42 let func1 = Rc::new(func);
43 let func2 = func1.clone();
44 Arranged {
45 stream: arranged.stream.map(move |bw| BatchFreeze::make_from(bw, func1.clone())),
46 trace: TraceFreeze::make_from(arranged.trace.clone(), func2),
47 }
48}
49
50pub struct TraceFreeze<Tr, F>
52where
53 Tr: TraceReader,
54 F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
55{
56 trace: Tr,
57 func: Rc<F>,
58}
59
60impl<Tr,F> Clone for TraceFreeze<Tr, F>
61where
62 Tr: TraceReader+Clone,
63 F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
64{
65 fn clone(&self) -> Self {
66 TraceFreeze {
67 trace: self.trace.clone(),
68 func: self.func.clone(),
69 }
70 }
71}
72
73impl<Tr, F> TraceReader for TraceFreeze<Tr, F>
74where
75 Tr: TraceReader,
76 Tr::Batch: Clone,
77 F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>+'static,
78{
79 type Key<'a> = Tr::Key<'a>;
80 type Val<'a> = Tr::Val<'a>;
81 type Time = Tr::Time;
82 type TimeGat<'a> = Tr::TimeGat<'a>;
83 type Diff = Tr::Diff;
84 type DiffGat<'a> = Tr::DiffGat<'a>;
85
86 type Batch = BatchFreeze<Tr::Batch, F>;
87 type Storage = Tr::Storage;
88 type Cursor = CursorFreeze<Tr::Cursor, F>;
89
90 fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
91 let func = &self.func;
92 self.trace.map_batches(|batch| {
93 f(&Self::Batch::make_from(batch.clone(), func.clone()));
94 })
95 }
96
97 fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
98 fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
99
100 fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
101 fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
102
103 fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
104 let func = &self.func;
105 self.trace.cursor_through(upper)
106 .map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage))
107 }
108}
109
110impl<Tr, F> TraceFreeze<Tr, F>
111where
112 Tr: TraceReader,
113 Tr::Batch: Clone,
114 F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
115{
116 pub fn make_from(trace: Tr, func: Rc<F>) -> Self {
118 Self { trace, func }
119 }
120}
121
122
123pub struct BatchFreeze<B, F> {
125 batch: B,
126 func: Rc<F>,
127}
128
129impl<B: Clone, F> Clone for BatchFreeze<B, F> {
130 fn clone(&self) -> Self {
131 BatchFreeze {
132 batch: self.batch.clone(),
133 func: self.func.clone(),
134 }
135 }
136}
137
138impl<B, F> BatchReader for BatchFreeze<B, F>
139where
140 B: BatchReader,
141 F: Fn(B::TimeGat<'_>)->Option<B::Time>,
142{
143 type Key<'a> = B::Key<'a>;
144 type Val<'a> = B::Val<'a>;
145 type Time = B::Time;
146 type TimeGat<'a> = B::TimeGat<'a>;
147 type Diff = B::Diff;
148 type DiffGat<'a> = B::DiffGat<'a>;
149
150 type Cursor = BatchCursorFreeze<B::Cursor, F>;
151
152 fn cursor(&self) -> Self::Cursor {
153 BatchCursorFreeze::new(self.batch.cursor(), self.func.clone())
154 }
155 fn len(&self) -> usize { self.batch.len() }
156 fn description(&self) -> &Description<B::Time> { self.batch.description() }
157}
158
159impl<B, F> BatchFreeze<B, F>
160where
161 B: BatchReader,
162 F: Fn(B::TimeGat<'_>)->Option<B::Time>
163{
164 pub fn make_from(batch: B, func: Rc<F>) -> Self {
166 Self { batch, func }
167 }
168}
169
170pub struct CursorFreeze<C, F> {
172 cursor: C,
173 func: Rc<F>,
174}
175
176impl<C, F> CursorFreeze<C, F> {
177 fn new(cursor: C, func: Rc<F>) -> Self {
178 Self { cursor, func }
179 }
180}
181
182impl<C, F> Cursor for CursorFreeze<C, F>
183where
184 C: Cursor,
185 F: Fn(C::TimeGat<'_>)->Option<C::Time>,
186{
187 type Key<'a> = C::Key<'a>;
188 type Val<'a> = C::Val<'a>;
189 type Time = C::Time;
190 type TimeGat<'a> = C::TimeGat<'a>;
191 type Diff = C::Diff;
192 type DiffGat<'a> = C::DiffGat<'a>;
193
194 type Storage = C::Storage;
195
196 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
197 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
198
199 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
200 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
201
202 #[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
203 let func = &self.func;
204 self.cursor.map_times(storage, |time, diff| {
205 if let Some(time) = func(time) {
206 logic(<Self::TimeGat<'_> as IntoOwned>::borrow_as(&time), diff);
207 }
208 })
209 }
210
211 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
212 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
213
214 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
215 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
216
217 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
218 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
219}
220
221
222pub struct BatchCursorFreeze<C, F> {
224 cursor: C,
225 func: Rc<F>,
226}
227
228impl<C, F> BatchCursorFreeze<C, F> {
229 fn new(cursor: C, func: Rc<F>) -> Self {
230 Self { cursor, func }
231 }
232}
233
234impl<C: Cursor, F> Cursor for BatchCursorFreeze<C, F>
236where
237 F: Fn(C::TimeGat<'_>)->Option<C::Time>,
238{
239 type Key<'a> = C::Key<'a>;
240 type Val<'a> = C::Val<'a>;
241 type Time = C::Time;
242 type TimeGat<'a> = C::TimeGat<'a>;
243 type Diff = C::Diff;
244 type DiffGat<'a> = C::DiffGat<'a>;
245
246 type Storage = BatchFreeze<C::Storage, F>;
247
248 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
249 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
250
251 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
252 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
253
254 #[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
255 let func = &self.func;
256 self.cursor.map_times(&storage.batch, |time, diff| {
257 if let Some(time) = func(time) {
258 logic(<Self::TimeGat<'_> as IntoOwned>::borrow_as(&time), diff);
259 }
260 })
261 }
262
263 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
264 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
265
266 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
267 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
268
269 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
270 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
271}