differential_dataflow/trace/wrappers/
freeze.rs1use std::rc::Rc;
21
22use timely::dataflow::Scope;
23use timely::dataflow::operators::Map;
24use timely::progress::frontier::AntichainRef;
25
26use crate::operators::arrange::Arranged;
27use crate::trace::{TraceReader, BatchReader, Description};
28use crate::trace::cursor::Cursor;
29use crate::IntoOwned;
30
31pub fn freeze<G, T, F>(arranged: &Arranged<G, T>, func: F) -> Arranged<G, TraceFreeze<T, F>>
37where
38    G: Scope<Timestamp=T::Time>,
39    T: TraceReader+Clone,
40    F: Fn(T::TimeGat<'_>)->Option<T::Time>+'static,
41{
42    let func1 = Rc::new(func);
43    let func2 = func1.clone();
44    Arranged {
45        stream: arranged.stream.map(move |bw| BatchFreeze::make_from(bw, func1.clone())),
46        trace: TraceFreeze::make_from(arranged.trace.clone(), func2),
47    }
48}
49
50pub struct TraceFreeze<Tr, F>
52where
53    Tr: TraceReader,
54    F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
55{
56    trace: Tr,
57    func: Rc<F>,
58}
59
60impl<Tr,F> Clone for TraceFreeze<Tr, F>
61where
62    Tr: TraceReader+Clone,
63    F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
64{
65    fn clone(&self) -> Self {
66        TraceFreeze {
67            trace: self.trace.clone(),
68            func: self.func.clone(),
69        }
70    }
71}
72
73impl<Tr, F> TraceReader for TraceFreeze<Tr, F>
74where
75    Tr: TraceReader,
76    Tr::Batch: Clone,
77    F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>+'static,
78{
79    type Key<'a> = Tr::Key<'a>;
80    type Val<'a> = Tr::Val<'a>;
81    type Time = Tr::Time;
82    type TimeGat<'a> = Tr::TimeGat<'a>;
83    type Diff = Tr::Diff;
84    type DiffGat<'a> = Tr::DiffGat<'a>;
85
86    type Batch = BatchFreeze<Tr::Batch, F>;
87    type Storage = Tr::Storage;
88    type Cursor = CursorFreeze<Tr::Cursor, F>;
89
90    fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
91        let func = &self.func;
92        self.trace.map_batches(|batch| {
93            f(&Self::Batch::make_from(batch.clone(), func.clone()));
94        })
95    }
96
97    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
98    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
99
100    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
101    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
102
103    fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
104        let func = &self.func;
105        self.trace.cursor_through(upper)
106            .map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage))
107    }
108}
109
110impl<Tr, F> TraceFreeze<Tr, F>
111where
112    Tr: TraceReader,
113    Tr::Batch: Clone,
114    F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
115{
116    pub fn make_from(trace: Tr, func: Rc<F>) -> Self {
118        Self { trace, func }
119    }
120}
121
122
123pub struct BatchFreeze<B, F> {
125    batch: B,
126    func: Rc<F>,
127}
128
129impl<B: Clone, F> Clone for BatchFreeze<B, F> {
130    fn clone(&self) -> Self {
131        BatchFreeze {
132            batch: self.batch.clone(),
133            func: self.func.clone(),
134        }
135    }
136}
137
138impl<B, F> BatchReader for BatchFreeze<B, F>
139where
140    B: BatchReader,
141    F: Fn(B::TimeGat<'_>)->Option<B::Time>,
142{
143    type Key<'a> = B::Key<'a>;
144    type Val<'a> = B::Val<'a>;
145    type Time = B::Time;
146    type TimeGat<'a> = B::TimeGat<'a>;
147    type Diff = B::Diff;
148    type DiffGat<'a> = B::DiffGat<'a>;
149
150    type Cursor = BatchCursorFreeze<B::Cursor, F>;
151
152    fn cursor(&self) -> Self::Cursor {
153        BatchCursorFreeze::new(self.batch.cursor(), self.func.clone())
154    }
155    fn len(&self) -> usize { self.batch.len() }
156    fn description(&self) -> &Description<B::Time> { self.batch.description() }
157}
158
159impl<B, F> BatchFreeze<B, F>
160where
161    B: BatchReader,
162    F: Fn(B::TimeGat<'_>)->Option<B::Time>
163{
164    pub fn make_from(batch: B, func: Rc<F>) -> Self {
166        Self { batch, func }
167    }
168}
169
170pub struct CursorFreeze<C, F> {
172    cursor: C,
173    func: Rc<F>,
174}
175
176impl<C, F> CursorFreeze<C, F> {
177    fn new(cursor: C, func: Rc<F>) -> Self {
178        Self { cursor, func }
179    }
180}
181
182impl<C, F> Cursor for CursorFreeze<C, F>
183where
184    C: Cursor,
185    F: Fn(C::TimeGat<'_>)->Option<C::Time>,
186{
187    type Key<'a> = C::Key<'a>;
188    type Val<'a> = C::Val<'a>;
189    type Time = C::Time;
190    type TimeGat<'a> = C::TimeGat<'a>;
191    type Diff = C::Diff;
192    type DiffGat<'a> = C::DiffGat<'a>;
193
194    type Storage = C::Storage;
195
196    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
197    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
198
199    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
200    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
201
202    #[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
203        let func = &self.func;
204        self.cursor.map_times(storage, |time, diff| {
205            if let Some(time) = func(time) {
206                logic(<Self::TimeGat<'_> as IntoOwned>::borrow_as(&time), diff);
207            }
208        })
209    }
210
211    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
212    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
213
214    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
215    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
216
217    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
218    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
219}
220
221
222pub struct BatchCursorFreeze<C, F> {
224    cursor: C,
225    func: Rc<F>,
226}
227
228impl<C, F> BatchCursorFreeze<C, F> {
229    fn new(cursor: C, func: Rc<F>) -> Self {
230        Self { cursor, func }
231    }
232}
233
234impl<C: Cursor, F> Cursor for BatchCursorFreeze<C, F>
236where
237    F: Fn(C::TimeGat<'_>)->Option<C::Time>,
238{
239    type Key<'a> = C::Key<'a>;
240    type Val<'a> = C::Val<'a>;
241    type Time = C::Time;
242    type TimeGat<'a> = C::TimeGat<'a>;
243    type Diff = C::Diff;
244    type DiffGat<'a> = C::DiffGat<'a>;
245
246    type Storage = BatchFreeze<C::Storage, F>;
247
248    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
249    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
250
251    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
252    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
253
254    #[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
255        let func = &self.func;
256        self.cursor.map_times(&storage.batch, |time, diff| {
257            if let Some(time) = func(time) {
258                logic(<Self::TimeGat<'_> as IntoOwned>::borrow_as(&time), diff);
259            }
260        })
261    }
262
263    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
264    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
265
266    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
267    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
268
269    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
270    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
271}