1pub mod layout;
35pub mod updates;
36pub mod builder;
37pub mod exchange;
38pub mod arrangement;
39pub mod batcher;
40pub mod spill;
41
42pub use updates::UpdatesTyped;
43pub use builder::ValBuilder as ValColBuilder;
44pub use exchange::ValPact;
45pub use arrangement::{ValBatcher, ValBuilder, ValChunker, ValSpine};
46
47pub const LINK_TARGET: usize = 64 * 1024;
49
50pub struct RecordedUpdates<U: layout::ColumnarUpdate> {
54 pub updates: updates::Updates<U>,
56 pub records: usize,
58 pub consolidated: bool,
61}
62
63impl<U: layout::ColumnarUpdate> Default for RecordedUpdates<U> {
64 fn default() -> Self { Self { updates: Default::default(), records: 0, consolidated: true } }
65}
66
67impl<U: layout::ColumnarUpdate> Clone for RecordedUpdates<U> {
68 fn clone(&self) -> Self { Self { updates: self.updates.clone(), records: self.records, consolidated: self.consolidated } }
69}
70
71impl<U: layout::ColumnarUpdate> timely::Accountable for RecordedUpdates<U> {
72 #[inline] fn record_count(&self) -> i64 { self.records as i64 }
73}
74
75impl<U: layout::ColumnarUpdate> timely::dataflow::channels::ContainerBytes for RecordedUpdates<U> {
76 fn from_bytes(mut bytes: timely::bytes::arc::Bytes) -> Self {
77 let header = bytes.extract_to(48);
79 let records = u64::from_le_bytes(header[0..8].try_into().unwrap()) as usize;
80 let consolidated = u64::from_le_bytes(header[8..16].try_into().unwrap()) != 0;
81 let keys_len = u64::from_le_bytes(header[16..24].try_into().unwrap()) as usize;
82 let vals_len = u64::from_le_bytes(header[24..32].try_into().unwrap()) as usize;
83 let times_len = u64::from_le_bytes(header[32..40].try_into().unwrap()) as usize;
84 let diffs_len = u64::from_le_bytes(header[40..48].try_into().unwrap()) as usize;
85 let keys_bytes = bytes.extract_to(keys_len);
87 let vals_bytes = bytes.extract_to(vals_len);
88 let times_bytes = bytes.extract_to(times_len);
89 let diffs_bytes = bytes.extract_to(diffs_len);
90 use columnar::bytes::stash::Stash;
91 let keys = Stash::try_from_bytes(keys_bytes).expect("keys decode failed");
92 let vals = Stash::try_from_bytes(vals_bytes).expect("vals decode failed");
93 let times = Stash::try_from_bytes(times_bytes).expect("times decode failed");
94 let diffs = Stash::try_from_bytes(diffs_bytes).expect("diffs decode failed");
95 RecordedUpdates {
96 updates: updates::Updates { keys, vals, times, diffs },
97 records,
98 consolidated,
99 }
100 }
101
102 fn length_in_bytes(&self) -> usize {
103 48 + self.updates.keys.length_in_bytes()
104 + self.updates.vals.length_in_bytes()
105 + self.updates.times.length_in_bytes()
106 + self.updates.diffs.length_in_bytes()
107 }
108
109 fn into_bytes<W: std::io::Write>(&self, writer: &mut W) {
110 let keys_len = self.updates.keys.length_in_bytes() as u64;
111 let vals_len = self.updates.vals.length_in_bytes() as u64;
112 let times_len = self.updates.times.length_in_bytes() as u64;
113 let diffs_len = self.updates.diffs.length_in_bytes() as u64;
114 writer.write_all(&(self.records as u64).to_le_bytes()).unwrap();
116 writer.write_all(&(self.consolidated as u64).to_le_bytes()).unwrap();
117 writer.write_all(&keys_len.to_le_bytes()).unwrap();
118 writer.write_all(&vals_len.to_le_bytes()).unwrap();
119 writer.write_all(×_len.to_le_bytes()).unwrap();
120 writer.write_all(&diffs_len.to_le_bytes()).unwrap();
121 self.updates.keys.write_bytes(writer).unwrap();
123 self.updates.vals.write_bytes(writer).unwrap();
124 self.updates.times.write_bytes(writer).unwrap();
125 self.updates.diffs.write_bytes(writer).unwrap();
126 }
127}
128
129mod container_impls {
131 use columnar::{Columnar, Index, Len, Push};
132 use timely::progress::{Timestamp, timestamp::Refines};
133 use crate::difference::Abelian;
134 use crate::collection::containers::{Negate, Enter, Leave, ResultsIn};
135
136 use super::layout::ColumnarUpdate as Update;
137 use super::updates::UpdatesTyped;
138 use super::RecordedUpdates;
139
140 impl<U: Update<Diff: Abelian>> Negate for RecordedUpdates<U> {
141 fn negate(self) -> Self {
142 use columnar::Container;
143 let RecordedUpdates { mut updates, records, consolidated } = self;
144 let view = updates.view();
145 let old_diffs = view.diffs.values;
146 let mut new_diffs = <<U::Diff as Columnar>::Container as Container>::with_capacity_for([old_diffs].into_iter());
147 let mut owned = U::Diff::default();
148 for i in 0..old_diffs.len() {
149 columnar::Columnar::copy_from(&mut owned, old_diffs.get(i));
150 owned.negate();
151 new_diffs.push(&owned);
152 }
153 updates.diffs.make_typed().values = new_diffs;
155 RecordedUpdates { updates, records, consolidated }
156 }
157 }
158
159 impl<K, V, T1, T2, R> Enter<T1, T2> for RecordedUpdates<(K, V, T1, R)>
160 where
161 (K, V, T1, R): Update<Key=K, Val=V, Time=T1, Diff=R>,
162 (K, V, T2, R): Update<Key=K, Val=V, Time=T2, Diff=R>,
163 T1: Timestamp + Columnar + Default + Clone,
164 T2: Refines<T1> + Columnar + Default + Clone,
165 K: Columnar, V: Columnar, R: Columnar,
166 {
167 type InnerContainer = RecordedUpdates<(K, V, T2, R)>;
168 fn enter(self) -> Self::InnerContainer {
169 use columnar::bytes::stash::Stash;
172 let RecordedUpdates { updates, records, consolidated } = self;
173 let times = updates.times.borrow();
174 let times_values = times.values;
175 let mut new_times = <<T2 as Columnar>::Container as Default>::default();
176 let mut t1_owned = T1::default();
177 for i in 0..times_values.len() {
178 Columnar::copy_from(&mut t1_owned, times_values.get(i));
179 let t2 = T2::to_inner(t1_owned.clone());
180 new_times.push(&t2);
181 }
182 let super::updates::Updates { keys, vals, mut times, diffs } = updates;
185 times.make_typed();
187 let Stash::Typed(times_lists) = times else { unreachable!() };
188 let times = Stash::Typed(super::updates::Lists {
189 values: new_times,
190 bounds: times_lists.bounds,
191 });
192 RecordedUpdates {
193 updates: super::updates::Updates { keys, vals, times, diffs },
194 records,
195 consolidated,
196 }
197 }
198 }
199
200 impl<K, V, T1, T2, R> Leave<T1, T2> for RecordedUpdates<(K, V, T1, R)>
201 where
202 (K, V, T1, R): Update<Key=K, Val=V, Time=T1, Diff=R>,
203 (K, V, T2, R): Update<Key=K, Val=V, Time=T2, Diff=R>,
204 T1: Refines<T2> + Columnar + Default + Clone,
205 T2: Timestamp + Columnar + Default + Clone,
206 K: Columnar, V: Columnar, R: Columnar,
207 {
208 type OuterContainer = RecordedUpdates<(K, V, T2, R)>;
209 fn leave(self) -> Self::OuterContainer {
210 use columnar::bytes::stash::Stash;
214 let RecordedUpdates { updates, records, consolidated: _ } = self;
215 let times = updates.times.borrow();
216 let times_values = times.values;
217 let mut new_times = <<T2 as Columnar>::Container as Default>::default();
218 let mut t1_owned = T1::default();
219 for i in 0..times_values.len() {
220 Columnar::copy_from(&mut t1_owned, times_values.get(i));
221 let t2: T2 = t1_owned.clone().to_outer();
222 new_times.push(&t2);
223 }
224 let super::updates::Updates { keys, vals, mut times, diffs } = updates;
225 times.make_typed();
227 let Stash::Typed(times_lists) = times else { unreachable!() };
228 let times = Stash::Typed(super::updates::Lists {
229 values: new_times,
230 bounds: times_lists.bounds,
231 });
232 let mid = super::updates::Updates { keys, vals, times, diffs };
233 RecordedUpdates {
235 updates: mid.into_typed().consolidate().into(),
236 records,
237 consolidated: true,
238 }
239 }
240 }
241
242 impl<U: Update> ResultsIn<<U::Time as Timestamp>::Summary> for RecordedUpdates<U> {
243 fn results_in(self, step: &<U::Time as Timestamp>::Summary) -> Self {
244 use timely::progress::PathSummary;
245 let mut output = UpdatesTyped::<U>::default();
248 let mut time_owned = U::Time::default();
249 for (k, v, t, d) in self.updates.view().iter() {
251 Columnar::copy_from(&mut time_owned, t);
252 if let Some(new_time) = step.results_in(&time_owned) {
253 output.push((k, v, &new_time, d));
254 }
255 }
256 RecordedUpdates { updates: output.into(), records: self.records, consolidated: false }
259 }
260 }
261}
262
263pub fn join_function<U, I, L>(
268 input: crate::Collection<U::Time, RecordedUpdates<U>>,
269 mut logic: L,
270) -> crate::Collection<U::Time, RecordedUpdates<U>>
271where
272 U::Time: crate::lattice::Lattice,
273 U: layout::ColumnarUpdate<Diff: crate::difference::Multiply<U::Diff, Output = U::Diff>>,
274 I: IntoIterator<Item = (U::Key, U::Val, U::Time, U::Diff)>,
275 L: FnMut(
276 columnar::Ref<'_, U::Key>,
277 columnar::Ref<'_, U::Val>,
278 columnar::Ref<'_, U::Time>,
279 columnar::Ref<'_, U::Diff>,
280 ) -> I + 'static,
281{
282 use timely::dataflow::operators::generic::Operator;
283 use timely::dataflow::channels::pact::Pipeline;
284 use crate::AsCollection;
285 use crate::difference::Multiply;
286 use crate::lattice::Lattice;
287 use columnar::Columnar;
288
289 input
290 .inner
291 .unary::<ValColBuilder<U>, _, _, _>(Pipeline, "JoinFunction", move |_, _| {
292 move |input, output| {
293 let mut t1o = U::Time::default();
294 let mut d1o = U::Diff::default();
295 input.for_each(|time, data| {
296 let mut session = output.session_with_builder(&time);
297 for (k1, v1, t1, d1) in data.updates.view().iter() {
298 Columnar::copy_from(&mut t1o, t1);
299 Columnar::copy_from(&mut d1o, d1);
300 for (k2, v2, t2, d2) in logic(k1, v1, t1, d1) {
301 let t3 = t2.join(&t1o);
302 let d3 = d2.multiply(&d1o);
303 session.give((&k2, &v2, &t3, &d3));
304 }
305 }
306 });
307 }
308 })
309 .as_collection()
310}
311
312pub type DynTime<TOuter, T> = timely::order::Product<TOuter, crate::dynamic::pointstamp::PointStamp<T>>;
315
316pub fn leave_dynamic<K, V, R, TOuter, T>(
323 input: crate::Collection<DynTime<TOuter, T>, RecordedUpdates<(K, V, DynTime<TOuter, T>, R)>>,
324 level: usize,
325) -> crate::Collection<DynTime<TOuter, T>, RecordedUpdates<(K, V, DynTime<TOuter, T>, R)>>
326where
327 K: columnar::Columnar,
328 V: columnar::Columnar,
329 R: columnar::Columnar,
330 TOuter: timely::progress::Timestamp + Default + columnar::Columnar,
331 T: timely::progress::Timestamp + Default + columnar::Columnar,
332 (K, V, DynTime<TOuter, T>, R): layout::ColumnarUpdate<Key = K, Val = V, Time = DynTime<TOuter, T>, Diff = R>,
333{
334 assert!(level > 0, "leave_dynamic requires level > 0");
335 use timely::dataflow::channels::pact::Pipeline;
336 use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
337 use timely::dataflow::operators::generic::OutputBuilder;
338 use timely::order::Product;
339 use timely::progress::Antichain;
340 use timely::container::{ContainerBuilder, PushInto};
341 use crate::AsCollection;
342 use crate::dynamic::pointstamp::{PointStamp, PointStampSummary};
343 use columnar::Columnar;
344
345 let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), input.inner.scope());
346 let (output, stream) = builder.new_output();
347 let mut output = OutputBuilder::from(output);
348 let mut op_input = builder.new_input_connection(
349 input.inner,
350 Pipeline,
351 [(
352 0,
353 Antichain::from_elem(Product {
354 outer: Default::default(),
355 inner: PointStampSummary {
356 retain: Some(level - 1),
357 actions: Vec::new(),
358 },
359 }),
360 )],
361 );
362
363 builder.build(move |_capability| {
364 let mut col_builder = ValColBuilder::<(K, V, DynTime<TOuter, T>, R)>::default();
365 let mut time = DynTime::<TOuter, T>::default();
366 move |_frontier| {
367 let mut output = output.activate();
368 op_input.for_each(|cap, data| {
369 let mut new_time = cap.time().clone();
371 let mut vec = std::mem::take(&mut new_time.inner).into_inner();
372 vec.truncate(level - 1);
373 new_time.inner = PointStamp::new(vec);
374 let new_cap = cap.delayed(&new_time, 0);
375 for (k, v, t, d) in data.updates.view().iter() {
382 Columnar::copy_from(&mut time, t);
383 let mut inner_vec = std::mem::take(&mut time.inner).into_inner();
384 inner_vec.truncate(level - 1);
385 time.inner = PointStamp::new(inner_vec);
386 col_builder.push_into((k, v, &time, d));
387 }
388 let mut session = output.session(&new_cap);
389 while let Some(container) = col_builder.finish() {
390 session.give_container(container);
391 }
392 });
393 }
394 });
395
396 stream.as_collection()
397}
398
399pub fn as_recorded_updates<U>(
404 arranged: crate::operators::arrange::Arranged<
405 crate::operators::arrange::TraceAgent<ValSpine<U::Key, U::Val, U::Time, U::Diff>>,
406 >,
407) -> crate::Collection<U::Time, RecordedUpdates<U>>
408where
409 U: layout::ColumnarUpdate,
410{
411 use timely::dataflow::operators::generic::Operator;
412 use timely::dataflow::channels::pact::Pipeline;
413 use crate::trace::{BatchReader, Cursor};
414 use crate::AsCollection;
415
416 arranged.stream
417 .unary::<ValColBuilder<U>, _, _, _>(Pipeline, "AsRecordedUpdates", |_, _| {
418 move |input, output| {
419 input.for_each(|time, batches| {
420 let mut session = output.session_with_builder(&time);
421 for batch in batches.drain(..) {
422 let mut cursor = batch.cursor();
423 while cursor.key_valid(&batch) {
424 while cursor.val_valid(&batch) {
425 let key = cursor.key(&batch);
426 let val = cursor.val(&batch);
427 cursor.map_times(&batch, |time, diff| {
428 session.give((key, val, time, diff));
429 });
430 cursor.step_val(&batch);
431 }
432 cursor.step_key(&batch);
433 }
434 }
435 });
436 }
437 })
438 .as_collection()
439}