differential_dataflow/trace/wrappers/
freeze.rs

1//! Wrappers to transform the timestamps of updates.
2//!
3//! These wrappers are primarily intended to support the re-use of a multi-version index
4//! as if it were frozen at a particular (nested) timestamp. For example, if one wants to
5//! re-use an index multiple times with minor edits, and only observe the edits at one
6//! logical time (meaning: observing all edits less or equal to that time, advanced to that
7//! time), this should allow that behavior.
8//!
9//! Informally, this wrapper is parameterized by a function `F: Fn(&T)->Option<T>` which
10//! provides the opportunity to alter the time at which an update happens and to suppress
11//! that update, if appropriate. For example, the function
12//!
13//! ```ignore
14//! |t| if t.inner <= 10 { let mut t = t.clone(); t.inner = 10; Some(t) } else { None }
15//! ```
16//!
17//! could be used to present all updates through inner iteration 10, but advanced to inner
18//! iteration 10, as if they all occurred exactly at that moment.
19
20use std::rc::Rc;
21
22use timely::dataflow::Scope;
23use timely::dataflow::operators::Map;
24use timely::progress::frontier::AntichainRef;
25
26use crate::operators::arrange::Arranged;
27use crate::lattice::Lattice;
28use crate::trace::{TraceReader, BatchReader, Description};
29use crate::trace::cursor::Cursor;
30
31/// Freezes updates to an arrangement using a supplied function.
32///
33/// This method is experimental, and should be used with care. The intent is that the function
34/// `func` can be used to restrict and lock in updates at a particular time, as suggested in the
35/// module-level documentation.
36pub fn freeze<G, T, F>(arranged: &Arranged<G, T>, func: F) -> Arranged<G, TraceFreeze<T, F>>
37where
38    G: Scope,
39    G::Timestamp: Lattice+Ord,
40    T: TraceReader<Time=G::Timestamp>+Clone,
41    T::Diff: 'static,
42    F: Fn(&G::Timestamp)->Option<G::Timestamp>+'static,
43{
44    let func1 = Rc::new(func);
45    let func2 = func1.clone();
46    Arranged {
47        stream: arranged.stream.map(move |bw| BatchFreeze::make_from(bw, func1.clone())),
48        trace: TraceFreeze::make_from(arranged.trace.clone(), func2),
49    }
50}
51
52/// Wrapper to provide trace to nested scope.
53pub struct TraceFreeze<Tr, F>
54where
55    Tr: TraceReader,
56    Tr::Time: Lattice+Clone+'static,
57    F: Fn(&Tr::Time)->Option<Tr::Time>,
58{
59    trace: Tr,
60    func: Rc<F>,
61}
62
63impl<Tr,F> Clone for TraceFreeze<Tr, F>
64where
65    Tr: TraceReader+Clone,
66    Tr::Time: Lattice+Clone+'static,
67    F: Fn(&Tr::Time)->Option<Tr::Time>,
68{
69    fn clone(&self) -> Self {
70        TraceFreeze {
71            trace: self.trace.clone(),
72            func: self.func.clone(),
73        }
74    }
75}
76
77impl<Tr, F> TraceReader for TraceFreeze<Tr, F>
78where
79    Tr: TraceReader,
80    Tr::Batch: Clone,
81    Tr::Time: Lattice+Clone+'static,
82    Tr::Diff: 'static,
83    F: Fn(&Tr::Time)->Option<Tr::Time>+'static,
84{
85    type Key<'a> = Tr::Key<'a>;
86    type KeyOwned = Tr::KeyOwned;
87    type Val<'a> = Tr::Val<'a>;
88    type ValOwned = Tr::ValOwned;
89    type Time = Tr::Time;
90    type Diff = Tr::Diff;
91
92    type Batch = BatchFreeze<Tr::Batch, F>;
93    type Storage = Tr::Storage;
94    type Cursor = CursorFreeze<Tr::Cursor, F>;
95
96    fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
97        let func = &self.func;
98        self.trace.map_batches(|batch| {
99            f(&Self::Batch::make_from(batch.clone(), func.clone()));
100        })
101    }
102
103    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
104    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
105
106    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
107    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
108
109    fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
110        let func = &self.func;
111        self.trace.cursor_through(upper)
112            .map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage))
113    }
114}
115
116impl<Tr, F> TraceFreeze<Tr, F>
117where
118    Tr: TraceReader,
119    Tr::Batch: Clone,
120    Tr::Time: Lattice+Clone+'static,
121    Tr::Diff: 'static,
122    F: Fn(&Tr::Time)->Option<Tr::Time>,
123{
124    /// Makes a new trace wrapper
125    pub fn make_from(trace: Tr, func: Rc<F>) -> Self {
126        Self { trace, func }
127    }
128}
129
130
131/// Wrapper to provide batch to nested scope.
132pub struct BatchFreeze<B, F> {
133    batch: B,
134    func: Rc<F>,
135}
136
137impl<B: Clone, F> Clone for BatchFreeze<B, F> {
138    fn clone(&self) -> Self {
139        BatchFreeze {
140            batch: self.batch.clone(),
141            func: self.func.clone(),
142        }
143    }
144}
145
146impl<B, F> BatchReader for BatchFreeze<B, F>
147where
148    B: BatchReader,
149    B::Time: Clone,
150    F: Fn(&B::Time)->Option<B::Time>,
151{
152    type Key<'a> = B::Key<'a>;
153    type KeyOwned = B::KeyOwned;
154    type Val<'a> = B::Val<'a>;
155    type ValOwned = B::ValOwned;
156    type Time = B::Time;
157    type Diff = B::Diff;
158
159    type Cursor = BatchCursorFreeze<B::Cursor, F>;
160
161    fn cursor(&self) -> Self::Cursor {
162        BatchCursorFreeze::new(self.batch.cursor(), self.func.clone())
163    }
164    fn len(&self) -> usize { self.batch.len() }
165    fn description(&self) -> &Description<B::Time> { self.batch.description() }
166}
167
168impl<B, F> BatchFreeze<B, F>
169where
170    B: BatchReader,
171    B::Time: Clone,
172    F: Fn(&B::Time)->Option<B::Time>
173{
174    /// Makes a new batch wrapper
175    pub fn make_from(batch: B, func: Rc<F>) -> Self {
176        Self { batch, func }
177    }
178}
179
180/// Wrapper to provide cursor to nested scope.
181pub struct CursorFreeze<C, F> {
182    cursor: C,
183    func: Rc<F>,
184}
185
186impl<C, F> CursorFreeze<C, F> {
187    fn new(cursor: C, func: Rc<F>) -> Self {
188        Self { cursor, func }
189    }
190}
191
192impl<C, F> Cursor for CursorFreeze<C, F>
193where
194    C: Cursor,
195    C::Time: Sized,
196    F: Fn(&C::Time)->Option<C::Time>,
197{
198    type Key<'a> = C::Key<'a>;
199    type KeyOwned = C::KeyOwned;
200    type Val<'a> = C::Val<'a>;
201    type ValOwned = C::ValOwned;
202    type Time = C::Time;
203    type Diff = C::Diff;
204
205    type Storage = C::Storage;
206
207    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
208    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
209
210    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
211    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
212
213    #[inline] fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
214        let func = &self.func;
215        self.cursor.map_times(storage, |time, diff| {
216            if let Some(time) = func(time) {
217                logic(&time, diff);
218            }
219        })
220    }
221
222    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
223    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
224
225    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
226    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
227
228    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
229    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
230}
231
232
233/// Wrapper to provide cursor to nested scope.
234pub struct BatchCursorFreeze<C, F> {
235    cursor: C,
236    func: Rc<F>,
237}
238
239impl<C, F> BatchCursorFreeze<C, F> {
240    fn new(cursor: C, func: Rc<F>) -> Self {
241        Self { cursor, func }
242    }
243}
244
245// impl<C: Cursor<Storage=B, Time=B::Time>, B: BatchReader<Cursor=C>, F> Cursor for BatchCursorFreeze<B, F>
246impl<C: Cursor, F> Cursor for BatchCursorFreeze<C, F>
247where
248    F: Fn(&C::Time)->Option<C::Time>,
249{
250    type Key<'a> = C::Key<'a>;
251    type KeyOwned = C::KeyOwned;
252    type Val<'a> = C::Val<'a>;
253    type ValOwned = C::ValOwned;
254    type Time = C::Time;
255    type Diff = C::Diff;
256
257    type Storage = BatchFreeze<C::Storage, F>;
258
259    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
260    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
261
262    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
263    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
264
265    #[inline] fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L) {
266        let func = &self.func;
267        self.cursor.map_times(&storage.batch, |time, diff| {
268            if let Some(time) = func(time) {
269                logic(&time, diff);
270            }
271        })
272    }
273
274    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
275    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
276
277    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
278    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
279
280    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
281    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
282}