1use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::MergeBatcher;
17use crate::trace::implementations::merge_batcher::container::{VecInternalMerger, ColInternalMerger};
18use crate::trace::rc_blanket_impls::RcBuilder;
19
20use super::{Layout, Vector, TStack};
21
22pub use self::val_batch::{OrdValBatch, OrdValBuilder};
23pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
24
25pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
27pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecInternalMerger<(K, V), T, R>>;
29pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
31
32pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
37pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColInternalMerger<(K,V),T,R>>;
39pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
41
42pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
44pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ContainerChunker<Vec<((K,()),T,R)>>, VecInternalMerger<(K, ()), T, R>>;
46pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
48
49pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
54pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColInternalMerger<(K,()),T,R>>;
56pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
58
59pub use layers::{Vals, Upds};
63pub mod layers {
72
73 use serde::{Deserialize, Serialize};
74 use crate::trace::implementations::BatchContainer;
75
76 #[derive(Debug, Serialize, Deserialize)]
78 pub struct Vals<O, V> {
79 pub offs: O,
83 pub vals: V,
85 }
86
87 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
88 fn default() -> Self { Self::with_capacity(0, 0) }
89 }
90
91 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
92 #[inline(always)] pub fn bounds(&self, index: usize) -> (usize, usize) {
94 (self.offs.index(index), self.offs.index(index+1))
95 }
96 pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
103 self.get_abs(self.bounds(list_idx).0 + item_idx)
104 }
105
106 pub fn len(&self) -> usize { self.offs.len() - 1 }
108 pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
110 self.vals.index(index)
111 }
112 pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
114 let mut offs = <O as BatchContainer>::with_capacity(o_size);
115 offs.push_ref(0);
116 Self {
117 offs,
118 vals: <V as BatchContainer>::with_capacity(v_size),
119 }
120 }
121 pub fn merge_capacity(this: &Self, that: &Self) -> Self {
123 let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
124 offs.push_ref(0);
125 Self {
126 offs,
127 vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
128 }
129 }
130 }
131
132 #[derive(Debug, Serialize, Deserialize)]
137 pub struct Upds<O, T, D> {
138 pub offs: O,
140 pub times: T,
142 pub diffs: D,
144 }
145
146 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {
147 fn default() -> Self { Self::with_capacity(0, 0) }
148 }
149 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Upds<O, T, D> {
150 pub fn bounds(&self, index: usize) -> (usize, usize) {
152 let mut lower = self.offs.index(index);
153 let upper = self.offs.index(index+1);
154 if lower == upper {
157 assert!(lower > 0);
158 lower -= 1;
159 }
160 (lower, upper)
161 }
162 pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
169 self.get_abs(self.bounds(list_idx).0 + item_idx)
170 }
171
172 pub fn len(&self) -> usize { self.offs.len() - 1 }
174 pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
176 (self.times.index(index), self.diffs.index(index))
177 }
178 pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
180 let mut offs = <O as BatchContainer>::with_capacity(o_size);
181 offs.push_ref(0);
182 Self {
183 offs,
184 times: <T as BatchContainer>::with_capacity(u_size),
185 diffs: <D as BatchContainer>::with_capacity(u_size),
186 }
187 }
188 pub fn merge_capacity(this: &Self, that: &Self) -> Self {
190 let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
191 offs.push_ref(0);
192 Self {
193 offs,
194 times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
195 diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
196 }
197 }
198 }
199
200 pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
202 stash: Vec<(T::Owned, D::Owned)>,
207 total: usize,
211
212 time_con: T,
214 diff_con: D,
216 }
217
218 impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
219 fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
220 }
221
222
223 impl<T, D> UpdsBuilder<T, D>
224 where
225 T: BatchContainer<Owned: Ord>,
226 D: BatchContainer<Owned: crate::difference::Semigroup>,
227 {
228 pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
230 self.stash.push((time, diff));
231 }
232
233 pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
237 use crate::consolidation;
238 consolidation::consolidate(&mut self.stash);
239 if self.stash.is_empty() { return false; }
241 if self.stash.len() == 1 {
243 let (time, diff) = self.stash.last().unwrap();
244 self.time_con.clear(); self.time_con.push_own(time);
245 self.diff_con.clear(); self.diff_con.push_own(diff);
246 if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
247 self.total += 1;
248 self.stash.clear();
249 upds.offs.push_ref(upds.times.len());
250 return true;
251 }
252 }
253 self.total += self.stash.len();
255 for (time, diff) in self.stash.drain(..) {
256 upds.times.push_own(&time);
257 upds.diffs.push_own(&diff);
258 }
259 upds.offs.push_ref(upds.times.len());
260 true
261 }
262
263 pub fn total(&self) -> usize { self.total }
265 }
266}
267
268pub mod val_batch {
270
271 use std::marker::PhantomData;
272 use serde::{Deserialize, Serialize};
273 use timely::container::PushInto;
274 use timely::progress::{Antichain, frontier::AntichainRef};
275
276 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
277 use crate::trace::implementations::{BatchContainer, BuilderInput};
278 use crate::trace::implementations::layout;
279
280 use super::{Layout, Vals, Upds, layers::UpdsBuilder};
281
282 #[derive(Debug, Serialize, Deserialize)]
284 #[serde(bound = "
285 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
286 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
287 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
288 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
289 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
290 ")]
291 pub struct OrdValStorage<L: Layout> {
292 pub keys: L::KeyContainer,
294 pub vals: Vals<L::OffsetContainer, L::ValContainer>,
296 pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
298 }
299
300 #[derive(Serialize, Deserialize)]
305 #[serde(bound = "
306 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
307 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
308 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
309 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
310 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
311 ")]
312 pub struct OrdValBatch<L: Layout> {
313 pub storage: OrdValStorage<L>,
315 pub description: Description<layout::Time<L>>,
317 pub updates: usize,
323 }
324
325 impl<L: Layout> WithLayout for OrdValBatch<L> {
326 type Layout = L;
327 }
328
329 impl<L: Layout> BatchReader for OrdValBatch<L> {
330
331 type Cursor = OrdValCursor<L>;
332 fn cursor(&self) -> Self::Cursor {
333 OrdValCursor {
334 key_cursor: 0,
335 val_cursor: 0,
336 phantom: PhantomData,
337 }
338 }
339 fn len(&self) -> usize {
340 self.updates
343 }
344 fn description(&self) -> &Description<layout::Time<L>> { &self.description }
345 }
346
347 impl<L: Layout> Batch for OrdValBatch<L> {
348 type Merger = OrdValMerger<L>;
349
350 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
351 OrdValMerger::new(self, other, compaction_frontier)
352 }
353
354 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
355 use timely::progress::Timestamp;
356 Self {
357 storage: OrdValStorage {
358 keys: L::KeyContainer::with_capacity(0),
359 vals: Default::default(),
360 upds: Default::default(),
361 },
362 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
363 updates: 0,
364 }
365 }
366 }
367
368 pub struct OrdValMerger<L: Layout> {
370 key_cursor1: usize,
372 key_cursor2: usize,
374 result: OrdValStorage<L>,
376 description: Description<layout::Time<L>>,
378 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
380 }
381
382 impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
383 where
384 OrdValBatch<L>: Batch<Time=layout::Time<L>>,
385 {
386 fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
387
388 assert!(batch1.upper() == batch2.lower());
389 use crate::lattice::Lattice;
390 let mut since = batch1.description().since().join(batch2.description().since());
391 since = since.join(&compaction_frontier.to_owned());
392
393 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
394
395 let batch1 = &batch1.storage;
396 let batch2 = &batch2.storage;
397
398 OrdValMerger {
399 key_cursor1: 0,
400 key_cursor2: 0,
401 result: OrdValStorage {
402 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
403 vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
404 upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
405 },
406 description,
407 staging: UpdsBuilder::default(),
408 }
409 }
410 fn done(self) -> OrdValBatch<L> {
411 OrdValBatch {
412 updates: self.staging.total(),
413 storage: self.result,
414 description: self.description,
415 }
416 }
417 fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
418
419 let starting_updates = self.staging.total();
421 let mut effort = 0isize;
422
423 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
425 self.merge_key(&source1.storage, &source2.storage);
426 effort = (self.staging.total() - starting_updates) as isize;
428 }
429
430 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
433 self.copy_key(&source1.storage, self.key_cursor1);
434 self.key_cursor1 += 1;
435 effort = (self.staging.total() - starting_updates) as isize;
436 }
437 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
438 self.copy_key(&source2.storage, self.key_cursor2);
439 self.key_cursor2 += 1;
440 effort = (self.staging.total() - starting_updates) as isize;
441 }
442
443 *fuel -= effort;
444 }
445 }
446
447 impl<L: Layout> OrdValMerger<L> {
449 fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
457 let init_vals = self.result.vals.vals.len();
459 let (mut lower, upper) = source.vals.bounds(cursor);
460 while lower < upper {
461 self.stash_updates_for_val(source, lower);
462 if self.staging.seal(&mut self.result.upds) {
463 self.result.vals.vals.push_ref(source.vals.get_abs(lower));
464 }
465 lower += 1;
466 }
467
468 if self.result.vals.vals.len() > init_vals {
470 self.result.keys.push_ref(source.keys.index(cursor));
471 self.result.vals.offs.push_ref(self.result.vals.vals.len());
472 }
473 }
474 fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
479 use ::std::cmp::Ordering;
480 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
481 Ordering::Less => {
482 self.copy_key(source1, self.key_cursor1);
483 self.key_cursor1 += 1;
484 },
485 Ordering::Equal => {
486 let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
488 let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
489 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
490 self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
491 self.result.vals.offs.push_ref(off);
492 }
493 self.key_cursor1 += 1;
495 self.key_cursor2 += 1;
496 },
497 Ordering::Greater => {
498 self.copy_key(source2, self.key_cursor2);
499 self.key_cursor2 += 1;
500 },
501 }
502 }
503 fn merge_vals(
508 &mut self,
509 (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
510 (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
511 ) -> Option<usize> {
512 let init_vals = self.result.vals.vals.len();
514 while lower1 < upper1 && lower2 < upper2 {
515 use ::std::cmp::Ordering;
519 match source1.vals.get_abs(lower1).cmp(&source2.vals.get_abs(lower2)) {
520 Ordering::Less => {
521 self.stash_updates_for_val(source1, lower1);
523 if self.staging.seal(&mut self.result.upds) {
524 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
525 }
526 lower1 += 1;
527 },
528 Ordering::Equal => {
529 self.stash_updates_for_val(source1, lower1);
530 self.stash_updates_for_val(source2, lower2);
531 if self.staging.seal(&mut self.result.upds) {
532 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
533 }
534 lower1 += 1;
535 lower2 += 1;
536 },
537 Ordering::Greater => {
538 self.stash_updates_for_val(source2, lower2);
540 if self.staging.seal(&mut self.result.upds) {
541 self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
542 }
543 lower2 += 1;
544 },
545 }
546 }
547 while lower1 < upper1 {
549 self.stash_updates_for_val(source1, lower1);
550 if self.staging.seal(&mut self.result.upds) {
551 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
552 }
553 lower1 += 1;
554 }
555 while lower2 < upper2 {
556 self.stash_updates_for_val(source2, lower2);
557 if self.staging.seal(&mut self.result.upds) {
558 self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
559 }
560 lower2 += 1;
561 }
562
563 if self.result.vals.vals.len() > init_vals {
565 Some(self.result.vals.vals.len())
566 } else {
567 None
568 }
569 }
570
571 fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
573 let (lower, upper) = source.upds.bounds(index);
574 for i in lower .. upper {
575 let (time, diff) = source.upds.get_abs(i);
577 use crate::lattice::Lattice;
578 let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
579 new_time.advance_by(self.description.since().borrow());
580 self.staging.push(new_time, L::DiffContainer::into_owned(diff));
581 }
582 }
583 }
584
585 pub struct OrdValCursor<L: Layout> {
587 key_cursor: usize,
589 val_cursor: usize,
591 phantom: PhantomData<L>,
593 }
594
595 use crate::trace::implementations::WithLayout;
596 impl<L: Layout> WithLayout for OrdValCursor<L> {
597 type Layout = L;
598 }
599
600 impl<L: Layout> Cursor for OrdValCursor<L> {
601
602 type Storage = OrdValBatch<L>;
603
604 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
605 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
606
607 fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
608 fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.get_abs(self.val_cursor) }
609 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
610 let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
611 for index in lower .. upper {
612 let (time, diff) = storage.storage.upds.get_abs(index);
613 logic(time, diff);
614 }
615 }
616 fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
617 fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1 }
618 fn step_key(&mut self, storage: &OrdValBatch<L>){
619 self.key_cursor += 1;
620 if self.key_valid(storage) {
621 self.rewind_vals(storage);
622 }
623 else {
624 self.key_cursor = storage.storage.keys.len();
625 }
626 }
627 fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
628 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
629 if self.key_valid(storage) {
630 self.rewind_vals(storage);
631 }
632 }
633 fn step_val(&mut self, storage: &OrdValBatch<L>) {
634 self.val_cursor += 1;
635 if !self.val_valid(storage) {
636 self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
637 }
638 }
639 fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
640 self.val_cursor += storage.storage.vals.vals.advance(self.val_cursor, storage.storage.vals.bounds(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
641 }
642 fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
643 self.key_cursor = 0;
644 if self.key_valid(storage) {
645 self.rewind_vals(storage)
646 }
647 }
648 fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
649 self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
650 }
651 }
652
653 pub struct OrdValBuilder<L: Layout, CI> {
655 pub result: OrdValStorage<L>,
659 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
660 _marker: PhantomData<CI>,
661 }
662
663 impl<L, CI> Builder for OrdValBuilder<L, CI>
664 where
665 L: for<'a> Layout<
666 KeyContainer: PushInto<CI::Key<'a>>,
667 ValContainer: PushInto<CI::Val<'a>>,
668 >,
669 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
670 {
671
672 type Input = CI;
673 type Time = layout::Time<L>;
674 type Output = OrdValBatch<L>;
675
676 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
677 Self {
678 result: OrdValStorage {
679 keys: L::KeyContainer::with_capacity(keys),
680 vals: Vals::with_capacity(keys + 1, vals),
681 upds: Upds::with_capacity(vals + 1, upds),
682 },
683 staging: UpdsBuilder::default(),
684 _marker: PhantomData,
685 }
686 }
687
688 #[inline]
689 fn push(&mut self, chunk: &mut Self::Input) {
690 for item in chunk.drain() {
691 let (key, val, time, diff) = CI::into_parts(item);
692
693 if self.result.keys.is_empty() {
695 self.result.vals.vals.push_into(val);
696 self.result.keys.push_into(key);
697 self.staging.push(time, diff);
698 }
699 else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
701 if self.result.vals.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
703 self.staging.push(time, diff);
704 } else {
705 self.staging.seal(&mut self.result.upds);
707 self.staging.push(time, diff);
708 self.result.vals.vals.push_into(val);
709 }
710 } else {
711 self.staging.seal(&mut self.result.upds);
713 self.staging.push(time, diff);
714 self.result.vals.offs.push_ref(self.result.vals.vals.len());
715 self.result.vals.vals.push_into(val);
716 self.result.keys.push_into(key);
717 }
718 }
719 }
720
721 #[inline(never)]
722 fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
723 self.staging.seal(&mut self.result.upds);
724 self.result.vals.offs.push_ref(self.result.vals.vals.len());
725 OrdValBatch {
726 updates: self.staging.total(),
727 storage: self.result,
728 description,
729 }
730 }
731
732 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
733 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
734 let mut builder = Self::with_capacity(keys, vals, upds);
735 for mut chunk in chain.drain(..) {
736 builder.push(&mut chunk);
737 }
738
739 builder.done(description)
740 }
741 }
742}
743
744pub mod key_batch {
746
747 use std::marker::PhantomData;
748 use serde::{Deserialize, Serialize};
749 use timely::container::PushInto;
750 use timely::progress::{Antichain, frontier::AntichainRef};
751
752 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
753 use crate::trace::implementations::{BatchContainer, BuilderInput};
754 use crate::trace::implementations::layout;
755
756 use super::{Layout, Upds, layers::UpdsBuilder};
757
758 #[derive(Debug, Serialize, Deserialize)]
760 #[serde(bound = "
761 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
762 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
763 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
764 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
765 ")]
766 pub struct OrdKeyStorage<L: Layout> {
767 pub keys: L::KeyContainer,
769 pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
771 }
772
773 #[derive(Serialize, Deserialize)]
778 #[serde(bound = "
779 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
780 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
781 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
782 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
783 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
784 ")]
785 pub struct OrdKeyBatch<L: Layout> {
786 pub storage: OrdKeyStorage<L>,
788 pub description: Description<layout::Time<L>>,
790 pub updates: usize,
796
797 pub value: L::ValContainer,
799 }
800
801 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> OrdKeyBatch<L> {
802 pub fn create_value() -> L::ValContainer {
804 let mut value = L::ValContainer::with_capacity(1);
805 value.push_own(&Default::default());
806 value
807 }
808 }
809
810 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> WithLayout for OrdKeyBatch<L> {
811 type Layout = L;
812 }
813
814 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> BatchReader for OrdKeyBatch<L> {
815
816 type Cursor = OrdKeyCursor<L>;
817 fn cursor(&self) -> Self::Cursor {
818 OrdKeyCursor {
819 key_cursor: 0,
820 val_stepped: false,
821 phantom: std::marker::PhantomData,
822 }
823 }
824 fn len(&self) -> usize {
825 self.updates
828 }
829 fn description(&self) -> &Description<layout::Time<L>> { &self.description }
830 }
831
832 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Batch for OrdKeyBatch<L> {
833 type Merger = OrdKeyMerger<L>;
834
835 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
836 OrdKeyMerger::new(self, other, compaction_frontier)
837 }
838
839 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
840 use timely::progress::Timestamp;
841 Self {
842 storage: OrdKeyStorage {
843 keys: L::KeyContainer::with_capacity(0),
844 upds: Upds::default(),
845 },
846 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
847 updates: 0,
848 value: Self::create_value(),
849 }
850 }
851 }
852
853 pub struct OrdKeyMerger<L: Layout> {
855 key_cursor1: usize,
857 key_cursor2: usize,
859 result: OrdKeyStorage<L>,
861 description: Description<layout::Time<L>>,
863
864 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
866 }
867
868 impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
869 where
870 OrdKeyBatch<L>: Batch<Time=layout::Time<L>>,
871 {
872 fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
873
874 assert!(batch1.upper() == batch2.lower());
875 use crate::lattice::Lattice;
876 let mut since = batch1.description().since().join(batch2.description().since());
877 since = since.join(&compaction_frontier.to_owned());
878
879 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
880
881 let batch1 = &batch1.storage;
882 let batch2 = &batch2.storage;
883
884 OrdKeyMerger {
885 key_cursor1: 0,
886 key_cursor2: 0,
887 result: OrdKeyStorage {
888 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
889 upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
890 },
891 description,
892 staging: UpdsBuilder::default(),
893 }
894 }
895 fn done(self) -> OrdKeyBatch<L> {
896 OrdKeyBatch {
897 updates: self.staging.total(),
898 storage: self.result,
899 description: self.description,
900 value: OrdKeyBatch::<L>::create_value(),
901 }
902 }
903 fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
904
905 let starting_updates = self.staging.total();
907 let mut effort = 0isize;
908
909 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
911 self.merge_key(&source1.storage, &source2.storage);
912 effort = (self.staging.total() - starting_updates) as isize;
914 }
915
916 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
919 self.copy_key(&source1.storage, self.key_cursor1);
920 self.key_cursor1 += 1;
921 effort = (self.staging.total() - starting_updates) as isize;
922 }
923 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
924 self.copy_key(&source2.storage, self.key_cursor2);
925 self.key_cursor2 += 1;
926 effort = (self.staging.total() - starting_updates) as isize;
927 }
928
929 *fuel -= effort;
930 }
931 }
932
933 impl<L: Layout> OrdKeyMerger<L> {
935 fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
943 self.stash_updates_for_key(source, cursor);
944 if self.staging.seal(&mut self.result.upds) {
945 self.result.keys.push_ref(source.keys.index(cursor));
946 }
947 }
948 fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
953 use ::std::cmp::Ordering;
954 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
955 Ordering::Less => {
956 self.copy_key(source1, self.key_cursor1);
957 self.key_cursor1 += 1;
958 },
959 Ordering::Equal => {
960 self.stash_updates_for_key(source1, self.key_cursor1);
962 self.stash_updates_for_key(source2, self.key_cursor2);
963 if self.staging.seal(&mut self.result.upds) {
964 self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
965 }
966 self.key_cursor1 += 1;
968 self.key_cursor2 += 1;
969 },
970 Ordering::Greater => {
971 self.copy_key(source2, self.key_cursor2);
972 self.key_cursor2 += 1;
973 },
974 }
975 }
976
977 fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
979 let (lower, upper) = source.upds.bounds(index);
980 for i in lower .. upper {
981 let (time, diff) = source.upds.get_abs(i);
983 use crate::lattice::Lattice;
984 let mut new_time = L::TimeContainer::into_owned(time);
985 new_time.advance_by(self.description.since().borrow());
986 self.staging.push(new_time, L::DiffContainer::into_owned(diff));
987 }
988 }
989 }
990
991 pub struct OrdKeyCursor<L: Layout> {
993 key_cursor: usize,
995 val_stepped: bool,
997 phantom: PhantomData<L>,
999 }
1000
1001 use crate::trace::implementations::WithLayout;
1002 impl<L: Layout<ValContainer: BatchContainer>> WithLayout for OrdKeyCursor<L> {
1003 type Layout = L;
1004 }
1005
1006 impl<L: for<'a> Layout<ValContainer: BatchContainer<Owned: Default>>> Cursor for OrdKeyCursor<L> {
1007
1008 type Storage = OrdKeyBatch<L>;
1009
1010 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
1011 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
1012
1013 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
1014 fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { storage.value.index(0) }
1015 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1016 let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
1017 for index in lower .. upper {
1018 let (time, diff) = storage.storage.upds.get_abs(index);
1019 logic(time, diff);
1020 }
1021 }
1022 fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1023 fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1024 fn step_key(&mut self, storage: &Self::Storage){
1025 self.key_cursor += 1;
1026 if self.key_valid(storage) {
1027 self.rewind_vals(storage);
1028 }
1029 else {
1030 self.key_cursor = storage.storage.keys.len();
1031 }
1032 }
1033 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1034 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1035 if self.key_valid(storage) {
1036 self.rewind_vals(storage);
1037 }
1038 }
1039 fn step_val(&mut self, _storage: &Self::Storage) {
1040 self.val_stepped = true;
1041 }
1042 fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1043 fn rewind_keys(&mut self, storage: &Self::Storage) {
1044 self.key_cursor = 0;
1045 if self.key_valid(storage) {
1046 self.rewind_vals(storage)
1047 }
1048 }
1049 fn rewind_vals(&mut self, _storage: &Self::Storage) {
1050 self.val_stepped = false;
1051 }
1052 }
1053
1054 pub struct OrdKeyBuilder<L: Layout, CI> {
1056 pub result: OrdKeyStorage<L>,
1060 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
1061 _marker: PhantomData<CI>,
1062 }
1063
1064 impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1065 where
1066 L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1067 L: Layout<ValContainer: BatchContainer<Owned: Default>>,
1068 CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
1069 {
1070
1071 type Input = CI;
1072 type Time = layout::Time<L>;
1073 type Output = OrdKeyBatch<L>;
1074
1075 fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1076 Self {
1077 result: OrdKeyStorage {
1078 keys: L::KeyContainer::with_capacity(keys),
1079 upds: Upds::with_capacity(keys+1, upds),
1080 },
1081 staging: UpdsBuilder::default(),
1082 _marker: PhantomData,
1083 }
1084 }
1085
1086 #[inline]
1087 fn push(&mut self, chunk: &mut Self::Input) {
1088 for item in chunk.drain() {
1089 let (key, _val, time, diff) = CI::into_parts(item);
1090 if self.result.keys.is_empty() {
1091 self.result.keys.push_into(key);
1092 self.staging.push(time, diff);
1093 }
1094 else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1096 self.staging.push(time, diff);
1097 } else {
1098 self.staging.seal(&mut self.result.upds);
1099 self.staging.push(time, diff);
1100 self.result.keys.push_into(key);
1101 }
1102 }
1103 }
1104
1105 #[inline(never)]
1106 fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1107 self.staging.seal(&mut self.result.upds);
1108 OrdKeyBatch {
1109 updates: self.staging.total(),
1110 storage: self.result,
1111 description,
1112 value: OrdKeyBatch::<L>::create_value(),
1113 }
1114 }
1115
1116 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1117 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1118 let mut builder = Self::with_capacity(keys, vals, upds);
1119 for mut chunk in chain.drain(..) {
1120 builder.push(&mut chunk);
1121 }
1122
1123 builder.done(description)
1124 }
1125 }
1126
1127}