differential_dataflow/trace/wrappers/
frontier.rs1use timely::progress::{Antichain, frontier::AntichainRef};
10
11use crate::trace::{TraceReader, BatchReader, Description};
12use crate::trace::cursor::Cursor;
13use crate::lattice::Lattice;
14
15pub struct TraceFrontier<Tr: TraceReader> {
17 trace: Tr,
18 since: Antichain<Tr::Time>,
20 until: Antichain<Tr::Time>,
22}
23
24impl<Tr: TraceReader + Clone> Clone for TraceFrontier<Tr> {
25 fn clone(&self) -> Self {
26 TraceFrontier {
27 trace: self.trace.clone(),
28 since: self.since.clone(),
29 until: self.until.clone(),
30 }
31 }
32}
33
34impl<Tr: TraceReader> TraceReader for TraceFrontier<Tr> {
35 type Key<'a> = Tr::Key<'a>;
36 type Val<'a> = Tr::Val<'a>;
37 type Time = Tr::Time;
38 type TimeGat<'a> = Tr::TimeGat<'a>;
39 type Diff = Tr::Diff;
40 type DiffGat<'a> = Tr::DiffGat<'a>;
41
42 type Batch = BatchFrontier<Tr::Batch>;
43 type Storage = Tr::Storage;
44 type Cursor = CursorFrontier<Tr::Cursor, Tr::Time>;
45
46 fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
47 let since = self.since.borrow();
48 let until = self.until.borrow();
49 self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until)))
50 }
51
52 fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
53 fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
54
55 fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
56 fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
57
58 fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
59 let since = self.since.borrow();
60 let until = self.until.borrow();
61 self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, since, until), y))
62 }
63}
64
65impl<Tr: TraceReader> TraceFrontier<Tr> {
66 pub fn make_from(trace: Tr, since: AntichainRef<Tr::Time>, until: AntichainRef<Tr::Time>) -> Self {
68 TraceFrontier {
69 trace,
70 since: since.to_owned(),
71 until: until.to_owned(),
72 }
73 }
74}
75
76
77#[derive(Clone)]
79pub struct BatchFrontier<B: BatchReader> {
80 batch: B,
81 since: Antichain<B::Time>,
82 until: Antichain<B::Time>,
83}
84
85impl<B: BatchReader> BatchReader for BatchFrontier<B> {
86 type Key<'a> = B::Key<'a>;
87 type Val<'a> = B::Val<'a>;
88 type Time = B::Time;
89 type TimeGat<'a> = B::TimeGat<'a>;
90 type Diff = B::Diff;
91 type DiffGat<'a> = B::DiffGat<'a>;
92
93 type Cursor = BatchCursorFrontier<B::Cursor>;
94
95 fn cursor(&self) -> Self::Cursor {
96 BatchCursorFrontier::new(self.batch.cursor(), self.since.borrow(), self.until.borrow())
97 }
98 fn len(&self) -> usize { self.batch.len() }
99 fn description(&self) -> &Description<B::Time> { self.batch.description() }
100}
101
102impl<B: BatchReader> BatchFrontier<B> {
103 pub fn make_from(batch: B, since: AntichainRef<B::Time>, until: AntichainRef<B::Time>) -> Self {
105 BatchFrontier {
106 batch,
107 since: since.to_owned(),
108 until: until.to_owned(),
109 }
110 }
111}
112
113pub struct CursorFrontier<C, T> {
115 cursor: C,
116 since: Antichain<T>,
117 until: Antichain<T>
118}
119
120impl<C, T> CursorFrontier<C, T> where T: Clone {
121 fn new(cursor: C, since: AntichainRef<T>, until: AntichainRef<T>) -> Self {
122 CursorFrontier {
123 cursor,
124 since: since.to_owned(),
125 until: until.to_owned(),
126 }
127 }
128}
129
130impl<C: Cursor> Cursor for CursorFrontier<C, C::Time> {
131 type Key<'a> = C::Key<'a>;
132 type Val<'a> = C::Val<'a>;
133 type Time = C::Time;
134 type TimeGat<'a> = C::TimeGat<'a>;
135 type Diff = C::Diff;
136 type DiffGat<'a> = C::DiffGat<'a>;
137
138 type Storage = C::Storage;
139
140 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
141 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
142
143 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
144 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
145
146 #[inline]
147 fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
148 let since = self.since.borrow();
149 let until = self.until.borrow();
150 let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
151 self.cursor.map_times(storage, |time, diff| {
152 use crate::IntoOwned;
153 time.clone_onto(&mut temp);
154 temp.advance_by(since);
155 if !until.less_equal(&temp) {
156 logic(<Self::TimeGat<'_> as IntoOwned>::borrow_as(&temp), diff);
157 }
158 })
159 }
160
161 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
162 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
163
164 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
165 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
166
167 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
168 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
169}
170
171
172
173pub struct BatchCursorFrontier<C: Cursor> {
175 cursor: C,
176 since: Antichain<C::Time>,
177 until: Antichain<C::Time>,
178}
179
180impl<C: Cursor> BatchCursorFrontier<C> {
181 fn new(cursor: C, since: AntichainRef<C::Time>, until: AntichainRef<C::Time>) -> Self {
182 BatchCursorFrontier {
183 cursor,
184 since: since.to_owned(),
185 until: until.to_owned(),
186 }
187 }
188}
189
190impl<C: Cursor> Cursor for BatchCursorFrontier<C>
191where
192 C::Storage: BatchReader,
193{
194 type Key<'a> = C::Key<'a>;
195 type Val<'a> = C::Val<'a>;
196 type Time = C::Time;
197 type TimeGat<'a> = C::TimeGat<'a>;
198 type Diff = C::Diff;
199 type DiffGat<'a> = C::DiffGat<'a>;
200
201 type Storage = BatchFrontier<C::Storage>;
202
203 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
204 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
205
206 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
207 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
208
209 #[inline]
210 fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
211 let since = self.since.borrow();
212 let until = self.until.borrow();
213 let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
214 self.cursor.map_times(&storage.batch, |time, diff| {
215 use crate::IntoOwned;
216 time.clone_onto(&mut temp);
217 temp.advance_by(since);
218 if !until.less_equal(&temp) {
219 logic(<Self::TimeGat<'_> as IntoOwned>::borrow_as(&temp), diff);
220 }
221 })
222 }
223
224 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
225 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
226
227 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
228 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
229
230 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
231 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
232}