1use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Update, Layout, Vector, TStack, Preferred};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
36pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
43pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
45pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
47
48pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
53pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
55pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
57
58pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
60pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>>;
62pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;
64
65mod val_batch {
70
71 use std::marker::PhantomData;
72 use serde::{Deserialize, Serialize};
73 use timely::container::PushInto;
74 use timely::progress::{Antichain, frontier::AntichainRef};
75
76 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
77 use crate::trace::implementations::{BatchContainer, BuilderInput};
78 use crate::IntoOwned;
79
80 use super::{Layout, Update};
81
82 #[derive(Debug, Serialize, Deserialize)]
84 pub struct OrdValStorage<L: Layout> {
85 pub keys: L::KeyContainer,
87 pub keys_offs: L::OffsetContainer,
91 pub vals: L::ValContainer,
93 pub vals_offs: L::OffsetContainer,
102 pub times: L::TimeContainer,
104 pub diffs: L::DiffContainer,
106 }
107
108 impl<L: Layout> OrdValStorage<L> {
109 fn values_for_key(&self, index: usize) -> (usize, usize) {
111 (self.keys_offs.index(index), self.keys_offs.index(index+1))
112 }
113 fn updates_for_value(&self, index: usize) -> (usize, usize) {
115 let mut lower = self.vals_offs.index(index);
116 let upper = self.vals_offs.index(index+1);
117 if lower == upper {
120 assert!(lower > 0);
121 lower -= 1;
122 }
123 (lower, upper)
124 }
125 }
126
127 #[derive(Serialize, Deserialize)]
132 #[serde(bound = "
133 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
134 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
135 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
136 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
137 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
138 ")]
139 pub struct OrdValBatch<L: Layout> {
140 pub storage: OrdValStorage<L>,
142 pub description: Description<<L::Target as Update>::Time>,
144 pub updates: usize,
150 }
151
152 impl<L: Layout> BatchReader for OrdValBatch<L> {
153 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
154 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
155 type Time = <L::Target as Update>::Time;
156 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
157 type Diff = <L::Target as Update>::Diff;
158 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
159
160 type Cursor = OrdValCursor<L>;
161 fn cursor(&self) -> Self::Cursor {
162 OrdValCursor {
163 key_cursor: 0,
164 val_cursor: 0,
165 phantom: PhantomData,
166 }
167 }
168 fn len(&self) -> usize {
169 self.updates
172 }
173 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
174 }
175
176 impl<L: Layout> Batch for OrdValBatch<L> {
177 type Merger = OrdValMerger<L>;
178
179 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
180 OrdValMerger::new(self, other, compaction_frontier)
181 }
182
183 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
184 use timely::progress::Timestamp;
185 Self {
186 storage: OrdValStorage {
187 keys: L::KeyContainer::with_capacity(0),
188 keys_offs: L::OffsetContainer::with_capacity(0),
189 vals: L::ValContainer::with_capacity(0),
190 vals_offs: L::OffsetContainer::with_capacity(0),
191 times: L::TimeContainer::with_capacity(0),
192 diffs: L::DiffContainer::with_capacity(0),
193 },
194 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
195 updates: 0,
196 }
197 }
198 }
199
200 pub struct OrdValMerger<L: Layout> {
202 key_cursor1: usize,
204 key_cursor2: usize,
206 result: OrdValStorage<L>,
208 description: Description<<L::Target as Update>::Time>,
210
211 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
216 singletons: usize,
218 }
219
220 impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
221 where
222 OrdValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
223 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
224 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
225 {
226 fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
227
228 assert!(batch1.upper() == batch2.lower());
229 use crate::lattice::Lattice;
230 let mut since = batch1.description().since().join(batch2.description().since());
231 since = since.join(&compaction_frontier.to_owned());
232
233 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
234
235 let batch1 = &batch1.storage;
236 let batch2 = &batch2.storage;
237
238 let mut storage = OrdValStorage {
239 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
240 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
241 vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
242 vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
243 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
244 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
245 };
246
247 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
249 keys_offs.push(0);
250 let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
251 vals_offs.push(0);
252
253 OrdValMerger {
254 key_cursor1: 0,
255 key_cursor2: 0,
256 result: storage,
257 description,
258 update_stash: Vec::new(),
259 singletons: 0,
260 }
261 }
262 fn done(self) -> OrdValBatch<L> {
263 OrdValBatch {
264 updates: self.result.times.len() + self.singletons,
265 storage: self.result,
266 description: self.description,
267 }
268 }
269 fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
270
271 let starting_updates = self.result.times.len();
273 let mut effort = 0isize;
274
275 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
277 self.merge_key(&source1.storage, &source2.storage);
278 effort = (self.result.times.len() - starting_updates) as isize;
280 }
281
282 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
285 self.copy_key(&source1.storage, self.key_cursor1);
286 self.key_cursor1 += 1;
287 effort = (self.result.times.len() - starting_updates) as isize;
288 }
289 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
290 self.copy_key(&source2.storage, self.key_cursor2);
291 self.key_cursor2 += 1;
292 effort = (self.result.times.len() - starting_updates) as isize;
293 }
294
295 *fuel -= effort;
296 }
297 }
298
299 impl<L: Layout> OrdValMerger<L> {
301 fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
309 let init_vals = self.result.vals.len();
311 let (mut lower, upper) = source.values_for_key(cursor);
312 while lower < upper {
313 self.stash_updates_for_val(source, lower);
314 if let Some(off) = self.consolidate_updates() {
315 self.result.vals_offs.push(off);
316 self.result.vals.push(source.vals.index(lower));
317 }
318 lower += 1;
319 }
320
321 if self.result.vals.len() > init_vals {
323 self.result.keys.push(source.keys.index(cursor));
324 self.result.keys_offs.push(self.result.vals.len());
325 }
326 }
327 fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
332 use ::std::cmp::Ordering;
333 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
334 Ordering::Less => {
335 self.copy_key(source1, self.key_cursor1);
336 self.key_cursor1 += 1;
337 },
338 Ordering::Equal => {
339 let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
341 let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
342 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
343 self.result.keys.push(source1.keys.index(self.key_cursor1));
344 self.result.keys_offs.push(off);
345 }
346 self.key_cursor1 += 1;
348 self.key_cursor2 += 1;
349 },
350 Ordering::Greater => {
351 self.copy_key(source2, self.key_cursor2);
352 self.key_cursor2 += 1;
353 },
354 }
355 }
356 fn merge_vals(
361 &mut self,
362 (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
363 (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
364 ) -> Option<usize> {
365 let init_vals = self.result.vals.len();
367 while lower1 < upper1 && lower2 < upper2 {
368 use ::std::cmp::Ordering;
372 match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
373 Ordering::Less => {
374 self.stash_updates_for_val(source1, lower1);
376 if let Some(off) = self.consolidate_updates() {
377 self.result.vals_offs.push(off);
378 self.result.vals.push(source1.vals.index(lower1));
379 }
380 lower1 += 1;
381 },
382 Ordering::Equal => {
383 self.stash_updates_for_val(source1, lower1);
384 self.stash_updates_for_val(source2, lower2);
385 if let Some(off) = self.consolidate_updates() {
386 self.result.vals_offs.push(off);
387 self.result.vals.push(source1.vals.index(lower1));
388 }
389 lower1 += 1;
390 lower2 += 1;
391 },
392 Ordering::Greater => {
393 self.stash_updates_for_val(source2, lower2);
395 if let Some(off) = self.consolidate_updates() {
396 self.result.vals_offs.push(off);
397 self.result.vals.push(source2.vals.index(lower2));
398 }
399 lower2 += 1;
400 },
401 }
402 }
403 while lower1 < upper1 {
405 self.stash_updates_for_val(source1, lower1);
406 if let Some(off) = self.consolidate_updates() {
407 self.result.vals_offs.push(off);
408 self.result.vals.push(source1.vals.index(lower1));
409 }
410 lower1 += 1;
411 }
412 while lower2 < upper2 {
413 self.stash_updates_for_val(source2, lower2);
414 if let Some(off) = self.consolidate_updates() {
415 self.result.vals_offs.push(off);
416 self.result.vals.push(source2.vals.index(lower2));
417 }
418 lower2 += 1;
419 }
420
421 if self.result.vals.len() > init_vals {
423 Some(self.result.vals.len())
424 } else {
425 None
426 }
427 }
428
429 fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
431 let (lower, upper) = source.updates_for_value(index);
432 for i in lower .. upper {
433 let time = source.times.index(i);
435 let diff = source.diffs.index(i);
436 use crate::lattice::Lattice;
437 let mut new_time: <L::Target as Update>::Time = time.into_owned();
438 new_time.advance_by(self.description.since().borrow());
439 self.update_stash.push((new_time, diff.into_owned()));
440 }
441 }
442
443 fn consolidate_updates(&mut self) -> Option<usize> {
445 use crate::consolidation;
446 consolidation::consolidate(&mut self.update_stash);
447 if !self.update_stash.is_empty() {
448 let time_diff = self.result.times.last().zip(self.result.diffs.last());
451 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
452 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
453 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
454 t1.eq(&t2) && d1.eq(&d2)
455 });
456 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
457 self.update_stash.clear();
459 self.singletons += 1;
460 }
461 else {
462 for (time, diff) in self.update_stash.drain(..) {
464 self.result.times.push(time);
465 self.result.diffs.push(diff);
466 }
467 }
468 Some(self.result.times.len())
469 } else {
470 None
471 }
472 }
473 }
474
475 pub struct OrdValCursor<L: Layout> {
477 key_cursor: usize,
479 val_cursor: usize,
481 phantom: PhantomData<L>,
483 }
484
485 impl<L: Layout> Cursor for OrdValCursor<L> {
486
487 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
488 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
489 type Time = <L::Target as Update>::Time;
490 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
491 type Diff = <L::Target as Update>::Diff;
492 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
493
494 type Storage = OrdValBatch<L>;
495
496 fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
497 fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
498 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
499 let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
500 for index in lower .. upper {
501 let time = storage.storage.times.index(index);
502 let diff = storage.storage.diffs.index(index);
503 logic(time, diff);
504 }
505 }
506 fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
507 fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
508 fn step_key(&mut self, storage: &OrdValBatch<L>){
509 self.key_cursor += 1;
510 if self.key_valid(storage) {
511 self.rewind_vals(storage);
512 }
513 else {
514 self.key_cursor = storage.storage.keys.len();
515 }
516 }
517 fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
518 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
519 if self.key_valid(storage) {
520 self.rewind_vals(storage);
521 }
522 }
523 fn step_val(&mut self, storage: &OrdValBatch<L>) {
524 self.val_cursor += 1;
525 if !self.val_valid(storage) {
526 self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
527 }
528 }
529 fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
530 self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
531 }
532 fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
533 self.key_cursor = 0;
534 if self.key_valid(storage) {
535 self.rewind_vals(storage)
536 }
537 }
538 fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
539 self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
540 }
541 }
542
543 pub struct OrdValBuilder<L: Layout, CI> {
545 result: OrdValStorage<L>,
546 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
547 singletons: usize,
552 _marker: PhantomData<CI>,
553 }
554
555 impl<L: Layout, CI> OrdValBuilder<L, CI> {
556 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
568 if self.result.times.last().map(|t| t == <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) &&
570 self.result.diffs.last().map(|d| d == <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true)
571 {
572 assert!(self.singleton.is_none());
573 self.singleton = Some((time, diff));
574 }
575 else {
576 if let Some((time, diff)) = self.singleton.take() {
578 self.result.times.push(time);
579 self.result.diffs.push(diff);
580 }
581 self.result.times.push(time);
582 self.result.diffs.push(diff);
583 }
584 }
585 }
586
587 impl<L, CI> Builder for OrdValBuilder<L, CI>
588 where
589 L: Layout,
590 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
591 for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
592 for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
593 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
594 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
595 {
596
597 type Input = CI;
598 type Time = <L::Target as Update>::Time;
599 type Output = OrdValBatch<L>;
600
601 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
602 Self {
604 result: OrdValStorage {
605 keys: L::KeyContainer::with_capacity(keys),
606 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
607 vals: L::ValContainer::with_capacity(vals),
608 vals_offs: L::OffsetContainer::with_capacity(vals + 1),
609 times: L::TimeContainer::with_capacity(upds),
610 diffs: L::DiffContainer::with_capacity(upds),
611 },
612 singleton: None,
613 singletons: 0,
614 _marker: PhantomData,
615 }
616 }
617
618 #[inline]
619 fn push(&mut self, chunk: &mut Self::Input) {
620 for item in chunk.drain() {
621 let (key, val, time, diff) = CI::into_parts(item);
622 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
624 if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
626 self.push_update(time, diff);
627 } else {
628 self.result.vals_offs.push(self.result.times.len());
630 if self.singleton.take().is_some() { self.singletons += 1; }
631 self.push_update(time, diff);
632 self.result.vals.push(val);
633 }
634 } else {
635 self.result.vals_offs.push(self.result.times.len());
637 if self.singleton.take().is_some() { self.singletons += 1; }
638 self.result.keys_offs.push(self.result.vals.len());
639 self.push_update(time, diff);
640 self.result.vals.push(val);
641 self.result.keys.push(key);
642 }
643 }
644 }
645
646 #[inline(never)]
647 fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
648 self.result.vals_offs.push(self.result.times.len());
650 if self.singleton.take().is_some() { self.singletons += 1; }
652 self.result.keys_offs.push(self.result.vals.len());
653 OrdValBatch {
654 updates: self.result.times.len() + self.singletons,
655 storage: self.result,
656 description,
657 }
658 }
659
660 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
661 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
662 let mut builder = Self::with_capacity(keys, vals, upds);
663 for mut chunk in chain.drain(..) {
664 builder.push(&mut chunk);
665 }
666
667 builder.done(description)
668 }
669 }
670}
671
672mod key_batch {
673
674 use std::marker::PhantomData;
675 use serde::{Deserialize, Serialize};
676 use timely::container::PushInto;
677 use timely::progress::{Antichain, frontier::AntichainRef};
678
679 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
680 use crate::trace::implementations::{BatchContainer, BuilderInput};
681 use crate::IntoOwned;
682
683 use super::{Layout, Update};
684
685 #[derive(Debug, Serialize, Deserialize)]
687 pub struct OrdKeyStorage<L: Layout> {
688 pub keys: L::KeyContainer,
690 pub keys_offs: L::OffsetContainer,
699 pub times: L::TimeContainer,
701 pub diffs: L::DiffContainer,
703 }
704
705 impl<L: Layout> OrdKeyStorage<L> {
706 fn updates_for_key(&self, index: usize) -> (usize, usize) {
708 let mut lower = self.keys_offs.index(index);
709 let upper = self.keys_offs.index(index+1);
710 if lower == upper {
713 assert!(lower > 0);
714 lower -= 1;
715 }
716 (lower, upper)
717 }
718 }
719
720 #[derive(Serialize, Deserialize)]
725 #[serde(bound = "
726 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
727 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
728 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
729 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
730 ")]
731 pub struct OrdKeyBatch<L: Layout> {
732 pub storage: OrdKeyStorage<L>,
734 pub description: Description<<L::Target as Update>::Time>,
736 pub updates: usize,
742 }
743
744 impl<L: Layout> BatchReader for OrdKeyBatch<L> {
745
746 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
747 type Val<'a> = &'a ();
748 type Time = <L::Target as Update>::Time;
749 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
750 type Diff = <L::Target as Update>::Diff;
751 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
752
753 type Cursor = OrdKeyCursor<L>;
754 fn cursor(&self) -> Self::Cursor {
755 OrdKeyCursor {
756 key_cursor: 0,
757 val_stepped: false,
758 phantom: std::marker::PhantomData,
759 }
760 }
761 fn len(&self) -> usize {
762 self.updates
765 }
766 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
767 }
768
769 impl<L: Layout> Batch for OrdKeyBatch<L> {
770 type Merger = OrdKeyMerger<L>;
771
772 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
773 OrdKeyMerger::new(self, other, compaction_frontier)
774 }
775
776 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
777 use timely::progress::Timestamp;
778 Self {
779 storage: OrdKeyStorage {
780 keys: L::KeyContainer::with_capacity(0),
781 keys_offs: L::OffsetContainer::with_capacity(0),
782 times: L::TimeContainer::with_capacity(0),
783 diffs: L::DiffContainer::with_capacity(0),
784 },
785 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
786 updates: 0,
787 }
788 }
789 }
790
791 pub struct OrdKeyMerger<L: Layout> {
793 key_cursor1: usize,
795 key_cursor2: usize,
797 result: OrdKeyStorage<L>,
799 description: Description<<L::Target as Update>::Time>,
801
802 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
807 singletons: usize,
809 }
810
811 impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
812 where
813 OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>,
814 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
815 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
816 {
817 fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
818
819 assert!(batch1.upper() == batch2.lower());
820 use crate::lattice::Lattice;
821 let mut since = batch1.description().since().join(batch2.description().since());
822 since = since.join(&compaction_frontier.to_owned());
823
824 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
825
826 let batch1 = &batch1.storage;
827 let batch2 = &batch2.storage;
828
829 let mut storage = OrdKeyStorage {
830 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
831 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
832 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
833 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
834 };
835
836 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
837 keys_offs.push(0);
838
839 OrdKeyMerger {
840 key_cursor1: 0,
841 key_cursor2: 0,
842 result: storage,
843 description,
844 update_stash: Vec::new(),
845 singletons: 0,
846 }
847 }
848 fn done(self) -> OrdKeyBatch<L> {
849 OrdKeyBatch {
850 updates: self.result.times.len() + self.singletons,
851 storage: self.result,
852 description: self.description,
853 }
854 }
855 fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
856
857 let starting_updates = self.result.times.len();
859 let mut effort = 0isize;
860
861 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
863 self.merge_key(&source1.storage, &source2.storage);
864 effort = (self.result.times.len() - starting_updates) as isize;
866 }
867
868 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
871 self.copy_key(&source1.storage, self.key_cursor1);
872 self.key_cursor1 += 1;
873 effort = (self.result.times.len() - starting_updates) as isize;
874 }
875 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
876 self.copy_key(&source2.storage, self.key_cursor2);
877 self.key_cursor2 += 1;
878 effort = (self.result.times.len() - starting_updates) as isize;
879 }
880
881 *fuel -= effort;
882 }
883 }
884
885 impl<L: Layout> OrdKeyMerger<L> {
887 fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
895 self.stash_updates_for_key(source, cursor);
896 if let Some(off) = self.consolidate_updates() {
897 self.result.keys_offs.push(off);
898 self.result.keys.push(source.keys.index(cursor));
899 }
900 }
901 fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
906 use ::std::cmp::Ordering;
907 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
908 Ordering::Less => {
909 self.copy_key(source1, self.key_cursor1);
910 self.key_cursor1 += 1;
911 },
912 Ordering::Equal => {
913 self.stash_updates_for_key(source1, self.key_cursor1);
915 self.stash_updates_for_key(source2, self.key_cursor2);
916 if let Some(off) = self.consolidate_updates() {
917 self.result.keys_offs.push(off);
918 self.result.keys.push(source1.keys.index(self.key_cursor1));
919 }
920 self.key_cursor1 += 1;
922 self.key_cursor2 += 1;
923 },
924 Ordering::Greater => {
925 self.copy_key(source2, self.key_cursor2);
926 self.key_cursor2 += 1;
927 },
928 }
929 }
930
931 fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
933 let (lower, upper) = source.updates_for_key(index);
934 for i in lower .. upper {
935 let time = source.times.index(i);
937 let diff = source.diffs.index(i);
938 use crate::lattice::Lattice;
939 let mut new_time = time.into_owned();
940 new_time.advance_by(self.description.since().borrow());
941 self.update_stash.push((new_time, diff.into_owned()));
942 }
943 }
944
945 fn consolidate_updates(&mut self) -> Option<usize> {
947 use crate::consolidation;
948 consolidation::consolidate(&mut self.update_stash);
949 if !self.update_stash.is_empty() {
950 let time_diff = self.result.times.last().zip(self.result.diffs.last());
953 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
954 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
955 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
956 t1.eq(&t2) && d1.eq(&d2)
957 });
958 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
959 self.update_stash.clear();
961 self.singletons += 1;
962 }
963 else {
964 for (time, diff) in self.update_stash.drain(..) {
966 self.result.times.push(time);
967 self.result.diffs.push(diff);
968 }
969 }
970 Some(self.result.times.len())
971 } else {
972 None
973 }
974 }
975 }
976
977 pub struct OrdKeyCursor<L: Layout> {
979 key_cursor: usize,
981 val_stepped: bool,
983 phantom: PhantomData<L>,
985 }
986
987 impl<L: Layout> Cursor for OrdKeyCursor<L> {
988 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
989 type Val<'a> = &'a ();
990 type Time = <L::Target as Update>::Time;
991 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
992 type Diff = <L::Target as Update>::Diff;
993 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
994
995 type Storage = OrdKeyBatch<L>;
996
997 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
998 fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
999 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1000 let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
1001 for index in lower .. upper {
1002 let time = storage.storage.times.index(index);
1003 let diff = storage.storage.diffs.index(index);
1004 logic(time, diff);
1005 }
1006 }
1007 fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1008 fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1009 fn step_key(&mut self, storage: &Self::Storage){
1010 self.key_cursor += 1;
1011 if self.key_valid(storage) {
1012 self.rewind_vals(storage);
1013 }
1014 else {
1015 self.key_cursor = storage.storage.keys.len();
1016 }
1017 }
1018 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1019 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1020 if self.key_valid(storage) {
1021 self.rewind_vals(storage);
1022 }
1023 }
1024 fn step_val(&mut self, _storage: &Self::Storage) {
1025 self.val_stepped = true;
1026 }
1027 fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1028 fn rewind_keys(&mut self, storage: &Self::Storage) {
1029 self.key_cursor = 0;
1030 if self.key_valid(storage) {
1031 self.rewind_vals(storage)
1032 }
1033 }
1034 fn rewind_vals(&mut self, _storage: &Self::Storage) {
1035 self.val_stepped = false;
1036 }
1037 }
1038
1039 pub struct OrdKeyBuilder<L: Layout, CI> {
1041 result: OrdKeyStorage<L>,
1042 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
1043 singletons: usize,
1048 _marker: PhantomData<CI>,
1049 }
1050
1051 impl<L: Layout, CI> OrdKeyBuilder<L, CI> {
1052 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
1064 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
1066 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
1067 if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
1068 assert!(self.singleton.is_none());
1069 self.singleton = Some((time, diff));
1070 }
1071 else {
1072 if let Some((time, diff)) = self.singleton.take() {
1074 self.result.times.push(time);
1075 self.result.diffs.push(diff);
1076 }
1077 self.result.times.push(time);
1078 self.result.diffs.push(diff);
1079 }
1080 }
1081 }
1082
1083 impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1084 where
1085 L: Layout,
1086 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
1087 for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
1088 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
1089 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
1090 {
1091
1092 type Input = CI;
1093 type Time = <L::Target as Update>::Time;
1094 type Output = OrdKeyBatch<L>;
1095
1096 fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1097 Self {
1099 result: OrdKeyStorage {
1100 keys: L::KeyContainer::with_capacity(keys),
1101 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
1102 times: L::TimeContainer::with_capacity(upds),
1103 diffs: L::DiffContainer::with_capacity(upds),
1104 },
1105 singleton: None,
1106 singletons: 0,
1107 _marker: PhantomData,
1108 }
1109 }
1110
1111 #[inline]
1112 fn push(&mut self, chunk: &mut Self::Input) {
1113 for item in chunk.drain() {
1114 let (key, _val, time, diff) = CI::into_parts(item);
1115 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1117 self.push_update(time, diff);
1118 } else {
1119 self.result.keys_offs.push(self.result.times.len());
1121 if self.singleton.take().is_some() { self.singletons += 1; }
1123 self.push_update(time, diff);
1124 self.result.keys.push(key);
1125 }
1126 }
1127 }
1128
1129 #[inline(never)]
1130 fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1131 self.result.keys_offs.push(self.result.times.len());
1133 if self.singleton.take().is_some() { self.singletons += 1; }
1135 OrdKeyBatch {
1136 updates: self.result.times.len() + self.singletons,
1137 storage: self.result,
1138 description,
1139 }
1140 }
1141
1142 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1143 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1144 let mut builder = Self::with_capacity(keys, vals, upds);
1145 for mut chunk in chain.drain(..) {
1146 builder.push(&mut chunk);
1147 }
1148
1149 builder.done(description)
1150 }
1151 }
1152
1153}