differential_dataflow/trace/wrappers/
filter.rs

1//! Wrapper for filtered trace.
2
3use timely::progress::frontier::AntichainRef;
4
5use crate::trace::{TraceReader, BatchReader, Description};
6use crate::trace::cursor::Cursor;
7
8/// Wrapper to provide trace to nested scope.
9pub struct TraceFilter<Tr, F> {
10    trace: Tr,
11    logic: F,
12}
13
14impl<Tr,F> Clone for TraceFilter<Tr, F>
15where
16    Tr: TraceReader+Clone,
17    F: Clone,
18{
19    fn clone(&self) -> Self {
20        TraceFilter {
21            trace: self.trace.clone(),
22            logic: self.logic.clone(),
23        }
24    }
25}
26
27impl<Tr, F> TraceReader for TraceFilter<Tr, F>
28where
29    Tr: TraceReader,
30    Tr::Batch: Clone,
31    F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static,
32{
33    type Key<'a> = Tr::Key<'a>;
34    type Val<'a> = Tr::Val<'a>;
35    type Time = Tr::Time;
36    type TimeGat<'a> = Tr::TimeGat<'a>;
37    type Diff = Tr::Diff;
38    type DiffGat<'a> = Tr::DiffGat<'a>;
39
40    type Batch = BatchFilter<Tr::Batch, F>;
41    type Storage = Tr::Storage;
42    type Cursor = CursorFilter<Tr::Cursor, F>;
43
44    fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
45        let logic = self.logic.clone();
46        self.trace
47            .map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), logic.clone())))
48    }
49
50    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
51    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
52
53    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
54    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
55
56    fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
57        self.trace.cursor_through(upper).map(|(x,y)| (CursorFilter::new(x, self.logic.clone()), y))
58    }
59}
60
61impl<Tr, F> TraceFilter<Tr, F>
62where
63    Tr: TraceReader,
64{
65    /// Makes a new trace wrapper
66    pub fn make_from(trace: Tr, logic: F) -> Self {
67        TraceFilter {
68            trace,
69            logic,
70        }
71    }
72}
73
74
75/// Wrapper to provide batch to nested scope.
76#[derive(Clone)]
77pub struct BatchFilter<B, F> {
78    batch: B,
79    logic: F,
80}
81
82impl<B, F> BatchReader for BatchFilter<B, F>
83where
84    B: BatchReader,
85    F: FnMut(B::Key<'_>, B::Val<'_>)->bool+Clone+'static
86{
87    type Key<'a> = B::Key<'a>;
88    type Val<'a> = B::Val<'a>;
89    type Time = B::Time;
90    type TimeGat<'a> = B::TimeGat<'a>;
91    type Diff = B::Diff;
92    type DiffGat<'a> = B::DiffGat<'a>;
93
94    type Cursor = BatchCursorFilter<B::Cursor, F>;
95
96    fn cursor(&self) -> Self::Cursor {
97        BatchCursorFilter::new(self.batch.cursor(), self.logic.clone())
98    }
99    fn len(&self) -> usize { self.batch.len() }
100    fn description(&self) -> &Description<B::Time> { self.batch.description() }
101}
102
103impl<B, F> BatchFilter<B, F>
104where
105    B: BatchReader,
106{
107    /// Makes a new batch wrapper
108    pub fn make_from(batch: B, logic: F) -> Self {
109        BatchFilter {
110            batch,
111            logic,
112        }
113    }
114}
115
116/// Wrapper to provide cursor to nested scope.
117pub struct CursorFilter<C, F> {
118    cursor: C,
119    logic: F,
120}
121
122impl<C, F> CursorFilter<C, F> {
123    fn new(cursor: C, logic: F) -> Self {
124        CursorFilter {
125            cursor,
126            logic,
127        }
128    }
129}
130
131impl<C, F> Cursor for CursorFilter<C, F>
132where
133    C: Cursor,
134    F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static
135{
136    type Key<'a> = C::Key<'a>;
137    type Val<'a> = C::Val<'a>;
138    type Time = C::Time;
139    type TimeGat<'a> = C::TimeGat<'a>;
140    type Diff = C::Diff;
141    type DiffGat<'a> = C::DiffGat<'a>;
142
143    type Storage = C::Storage;
144
145    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
146    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
147
148    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
149    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
150
151    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
152    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
153
154    #[inline]
155    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
156        let key = self.key(storage);
157        let val = self.val(storage);
158        if (self.logic)(key, val) {
159            self.cursor.map_times(storage, logic)
160        }
161    }
162
163    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
164    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
165
166    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
167    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
168
169    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
170    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
171}
172
173
174
175/// Wrapper to provide cursor to nested scope.
176pub struct BatchCursorFilter<C, F> {
177    cursor: C,
178    logic: F,
179}
180
181impl<C, F> BatchCursorFilter<C, F> {
182    fn new(cursor: C, logic: F) -> Self {
183        BatchCursorFilter {
184            cursor,
185            logic,
186        }
187    }
188}
189
190impl<C: Cursor, F> Cursor for BatchCursorFilter<C, F>
191where
192    F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static,
193{
194    type Key<'a> = C::Key<'a>;
195    type Val<'a> = C::Val<'a>;
196    type Time = C::Time;
197    type TimeGat<'a> = C::TimeGat<'a>;
198    type Diff = C::Diff;
199    type DiffGat<'a> = C::DiffGat<'a>;
200
201    type Storage = BatchFilter<C::Storage, F>;
202
203    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
204    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
205
206    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
207    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
208
209    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
210    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
211
212    #[inline]
213    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
214        let key = self.key(storage);
215        let val = self.val(storage);
216        if (self.logic)(key, val) {
217            self.cursor.map_times(&storage.batch, logic)
218        }
219    }
220
221    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
222    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
223
224    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
225    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
226
227    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
228    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
229}