1use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Update, Layout, Vector, TStack, Preferred};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
36pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
43pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
45pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
47
48pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
53pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
55pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
57
58pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
60pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>>;
62pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;
64
65mod val_batch {
70
71 use std::marker::PhantomData;
72 use serde::{Deserialize, Serialize};
73 use timely::container::PushInto;
74 use timely::progress::{Antichain, frontier::AntichainRef};
75
76 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
77 use crate::trace::implementations::{BatchContainer, BuilderInput};
78 use crate::IntoOwned;
79
80 use super::{Layout, Update};
81
82 #[derive(Debug, Serialize, Deserialize)]
84 pub struct OrdValStorage<L: Layout> {
85 pub keys: L::KeyContainer,
87 pub keys_offs: L::OffsetContainer,
91 pub vals: L::ValContainer,
93 pub vals_offs: L::OffsetContainer,
102 pub times: L::TimeContainer,
104 pub diffs: L::DiffContainer,
106 }
107
108 impl<L: Layout> OrdValStorage<L> {
109 fn values_for_key(&self, index: usize) -> (usize, usize) {
111 (self.keys_offs.index(index), self.keys_offs.index(index+1))
112 }
113 fn updates_for_value(&self, index: usize) -> (usize, usize) {
115 let mut lower = self.vals_offs.index(index);
116 let upper = self.vals_offs.index(index+1);
117 if lower == upper {
120 assert!(lower > 0);
121 lower -= 1;
122 }
123 (lower, upper)
124 }
125 }
126
127 #[derive(Serialize, Deserialize)]
132 #[serde(bound = "
133 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
134 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
135 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
136 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
137 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
138 ")]
139 pub struct OrdValBatch<L: Layout> {
140 pub storage: OrdValStorage<L>,
142 pub description: Description<<L::Target as Update>::Time>,
144 pub updates: usize,
150 }
151
152 impl<L: Layout> BatchReader for OrdValBatch<L> {
153 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
154 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
155 type Time = <L::Target as Update>::Time;
156 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
157 type Diff = <L::Target as Update>::Diff;
158 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
159
160 type Cursor = OrdValCursor<L>;
161 fn cursor(&self) -> Self::Cursor {
162 OrdValCursor {
163 key_cursor: 0,
164 val_cursor: 0,
165 phantom: PhantomData,
166 }
167 }
168 fn len(&self) -> usize {
169 self.updates
172 }
173 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
174 }
175
176 impl<L: Layout> Batch for OrdValBatch<L> {
177 type Merger = OrdValMerger<L>;
178
179 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
180 OrdValMerger::new(self, other, compaction_frontier)
181 }
182
183 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
184 use timely::progress::Timestamp;
185 Self {
186 storage: OrdValStorage {
187 keys: L::KeyContainer::with_capacity(0),
188 keys_offs: L::OffsetContainer::with_capacity(0),
189 vals: L::ValContainer::with_capacity(0),
190 vals_offs: L::OffsetContainer::with_capacity(0),
191 times: L::TimeContainer::with_capacity(0),
192 diffs: L::DiffContainer::with_capacity(0),
193 },
194 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
195 updates: 0,
196 }
197 }
198 }
199
200 pub struct OrdValMerger<L: Layout> {
202 key_cursor1: usize,
204 key_cursor2: usize,
206 result: OrdValStorage<L>,
208 description: Description<<L::Target as Update>::Time>,
210
211 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
216 singletons: usize,
218 }
219
220 impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
221 where
222 OrdValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
223 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
224 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
225 {
226 fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
227
228 assert!(batch1.upper() == batch2.lower());
229 use crate::lattice::Lattice;
230 let mut since = batch1.description().since().join(batch2.description().since());
231 since = since.join(&compaction_frontier.to_owned());
232
233 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
234
235 let batch1 = &batch1.storage;
236 let batch2 = &batch2.storage;
237
238 let mut storage = OrdValStorage {
239 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
240 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
241 vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
242 vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
243 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
244 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
245 };
246
247 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
249 keys_offs.push(0);
250 let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
251 vals_offs.push(0);
252
253 OrdValMerger {
254 key_cursor1: 0,
255 key_cursor2: 0,
256 result: storage,
257 description,
258 update_stash: Vec::new(),
259 singletons: 0,
260 }
261 }
262 fn done(self) -> OrdValBatch<L> {
263 OrdValBatch {
264 updates: self.result.times.len() + self.singletons,
265 storage: self.result,
266 description: self.description,
267 }
268 }
269 fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
270
271 let starting_updates = self.result.times.len();
273 let mut effort = 0isize;
274
275 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
277 self.merge_key(&source1.storage, &source2.storage);
278 effort = (self.result.times.len() - starting_updates) as isize;
280 }
281
282 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
285 self.copy_key(&source1.storage, self.key_cursor1);
286 self.key_cursor1 += 1;
287 effort = (self.result.times.len() - starting_updates) as isize;
288 }
289 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
290 self.copy_key(&source2.storage, self.key_cursor2);
291 self.key_cursor2 += 1;
292 effort = (self.result.times.len() - starting_updates) as isize;
293 }
294
295 *fuel -= effort;
296 }
297 }
298
299 impl<L: Layout> OrdValMerger<L> {
301 fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
309 let init_vals = self.result.vals.len();
311 let (mut lower, upper) = source.values_for_key(cursor);
312 while lower < upper {
313 self.stash_updates_for_val(source, lower);
314 if let Some(off) = self.consolidate_updates() {
315 self.result.vals_offs.push(off);
316 self.result.vals.push(source.vals.index(lower));
317 }
318 lower += 1;
319 }
320
321 if self.result.vals.len() > init_vals {
323 self.result.keys.push(source.keys.index(cursor));
324 self.result.keys_offs.push(self.result.vals.len());
325 }
326 }
327 fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
332 use ::std::cmp::Ordering;
333 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
334 Ordering::Less => {
335 self.copy_key(source1, self.key_cursor1);
336 self.key_cursor1 += 1;
337 },
338 Ordering::Equal => {
339 let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
341 let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
342 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
343 self.result.keys.push(source1.keys.index(self.key_cursor1));
344 self.result.keys_offs.push(off);
345 }
346 self.key_cursor1 += 1;
348 self.key_cursor2 += 1;
349 },
350 Ordering::Greater => {
351 self.copy_key(source2, self.key_cursor2);
352 self.key_cursor2 += 1;
353 },
354 }
355 }
356 fn merge_vals(
361 &mut self,
362 (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
363 (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
364 ) -> Option<usize> {
365 let init_vals = self.result.vals.len();
367 while lower1 < upper1 && lower2 < upper2 {
368 use ::std::cmp::Ordering;
372 match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
373 Ordering::Less => {
374 self.stash_updates_for_val(source1, lower1);
376 if let Some(off) = self.consolidate_updates() {
377 self.result.vals_offs.push(off);
378 self.result.vals.push(source1.vals.index(lower1));
379 }
380 lower1 += 1;
381 },
382 Ordering::Equal => {
383 self.stash_updates_for_val(source1, lower1);
384 self.stash_updates_for_val(source2, lower2);
385 if let Some(off) = self.consolidate_updates() {
386 self.result.vals_offs.push(off);
387 self.result.vals.push(source1.vals.index(lower1));
388 }
389 lower1 += 1;
390 lower2 += 1;
391 },
392 Ordering::Greater => {
393 self.stash_updates_for_val(source2, lower2);
395 if let Some(off) = self.consolidate_updates() {
396 self.result.vals_offs.push(off);
397 self.result.vals.push(source2.vals.index(lower2));
398 }
399 lower2 += 1;
400 },
401 }
402 }
403 while lower1 < upper1 {
405 self.stash_updates_for_val(source1, lower1);
406 if let Some(off) = self.consolidate_updates() {
407 self.result.vals_offs.push(off);
408 self.result.vals.push(source1.vals.index(lower1));
409 }
410 lower1 += 1;
411 }
412 while lower2 < upper2 {
413 self.stash_updates_for_val(source2, lower2);
414 if let Some(off) = self.consolidate_updates() {
415 self.result.vals_offs.push(off);
416 self.result.vals.push(source2.vals.index(lower2));
417 }
418 lower2 += 1;
419 }
420
421 if self.result.vals.len() > init_vals {
423 Some(self.result.vals.len())
424 } else {
425 None
426 }
427 }
428
429 fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
431 let (lower, upper) = source.updates_for_value(index);
432 for i in lower .. upper {
433 let time = source.times.index(i);
435 let diff = source.diffs.index(i);
436 use crate::lattice::Lattice;
437 let mut new_time: <L::Target as Update>::Time = time.into_owned();
438 new_time.advance_by(self.description.since().borrow());
439 self.update_stash.push((new_time, diff.into_owned()));
440 }
441 }
442
443 fn consolidate_updates(&mut self) -> Option<usize> {
445 use crate::consolidation;
446 consolidation::consolidate(&mut self.update_stash);
447 if !self.update_stash.is_empty() {
448 let time_diff = self.result.times.last().zip(self.result.diffs.last());
451 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
452 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
453 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
454 t1.eq(&t2) && d1.eq(&d2)
455 });
456 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
457 self.update_stash.clear();
459 self.singletons += 1;
460 }
461 else {
462 for (time, diff) in self.update_stash.drain(..) {
464 self.result.times.push(time);
465 self.result.diffs.push(diff);
466 }
467 }
468 Some(self.result.times.len())
469 } else {
470 None
471 }
472 }
473 }
474
475 pub struct OrdValCursor<L: Layout> {
477 key_cursor: usize,
479 val_cursor: usize,
481 phantom: PhantomData<L>,
483 }
484
485 impl<L: Layout> Cursor for OrdValCursor<L> {
486
487 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
488 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
489 type Time = <L::Target as Update>::Time;
490 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
491 type Diff = <L::Target as Update>::Diff;
492 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
493
494 type Storage = OrdValBatch<L>;
495
496 fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
497 fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
498 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
499 let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
500 for index in lower .. upper {
501 let time = storage.storage.times.index(index);
502 let diff = storage.storage.diffs.index(index);
503 logic(time, diff);
504 }
505 }
506 fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
507 fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
508 fn step_key(&mut self, storage: &OrdValBatch<L>){
509 self.key_cursor += 1;
510 if self.key_valid(storage) {
511 self.rewind_vals(storage);
512 }
513 else {
514 self.key_cursor = storage.storage.keys.len();
515 }
516 }
517 fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
518 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
519 if self.key_valid(storage) {
520 self.rewind_vals(storage);
521 }
522 }
523 fn step_val(&mut self, storage: &OrdValBatch<L>) {
524 self.val_cursor += 1;
525 if !self.val_valid(storage) {
526 self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
527 }
528 }
529 fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
530 self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
531 }
532 fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
533 self.key_cursor = 0;
534 if self.key_valid(storage) {
535 self.rewind_vals(storage)
536 }
537 }
538 fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
539 self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
540 }
541 }
542
543 pub struct OrdValBuilder<L: Layout, CI> {
545 pub result: OrdValStorage<L>,
549 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
550 singletons: usize,
555 _marker: PhantomData<CI>,
556 }
557
558 impl<L: Layout, CI> OrdValBuilder<L, CI> {
559 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
571 if self.result.times.last().map(|t| t == <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) &&
573 self.result.diffs.last().map(|d| d == <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true)
574 {
575 assert!(self.singleton.is_none());
576 self.singleton = Some((time, diff));
577 }
578 else {
579 if let Some((time, diff)) = self.singleton.take() {
581 self.result.times.push(time);
582 self.result.diffs.push(diff);
583 }
584 self.result.times.push(time);
585 self.result.diffs.push(diff);
586 }
587 }
588 }
589
590 impl<L, CI> Builder for OrdValBuilder<L, CI>
591 where
592 L: Layout,
593 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
594 for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
595 for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
596 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
597 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
598 {
599
600 type Input = CI;
601 type Time = <L::Target as Update>::Time;
602 type Output = OrdValBatch<L>;
603
604 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
605 Self {
607 result: OrdValStorage {
608 keys: L::KeyContainer::with_capacity(keys),
609 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
610 vals: L::ValContainer::with_capacity(vals),
611 vals_offs: L::OffsetContainer::with_capacity(vals + 1),
612 times: L::TimeContainer::with_capacity(upds),
613 diffs: L::DiffContainer::with_capacity(upds),
614 },
615 singleton: None,
616 singletons: 0,
617 _marker: PhantomData,
618 }
619 }
620
621 #[inline]
622 fn push(&mut self, chunk: &mut Self::Input) {
623 for item in chunk.drain() {
624 let (key, val, time, diff) = CI::into_parts(item);
625 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
627 if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
629 self.push_update(time, diff);
630 } else {
631 self.result.vals_offs.push(self.result.times.len());
633 if self.singleton.take().is_some() { self.singletons += 1; }
634 self.push_update(time, diff);
635 self.result.vals.push(val);
636 }
637 } else {
638 self.result.vals_offs.push(self.result.times.len());
640 if self.singleton.take().is_some() { self.singletons += 1; }
641 self.result.keys_offs.push(self.result.vals.len());
642 self.push_update(time, diff);
643 self.result.vals.push(val);
644 self.result.keys.push(key);
645 }
646 }
647 }
648
649 #[inline(never)]
650 fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
651 self.result.vals_offs.push(self.result.times.len());
653 if self.singleton.take().is_some() { self.singletons += 1; }
655 self.result.keys_offs.push(self.result.vals.len());
656 OrdValBatch {
657 updates: self.result.times.len() + self.singletons,
658 storage: self.result,
659 description,
660 }
661 }
662
663 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
664 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
665 let mut builder = Self::with_capacity(keys, vals, upds);
666 for mut chunk in chain.drain(..) {
667 builder.push(&mut chunk);
668 }
669
670 builder.done(description)
671 }
672 }
673}
674
675mod key_batch {
676
677 use std::marker::PhantomData;
678 use serde::{Deserialize, Serialize};
679 use timely::container::PushInto;
680 use timely::progress::{Antichain, frontier::AntichainRef};
681
682 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
683 use crate::trace::implementations::{BatchContainer, BuilderInput};
684 use crate::IntoOwned;
685
686 use super::{Layout, Update};
687
688 #[derive(Debug, Serialize, Deserialize)]
690 pub struct OrdKeyStorage<L: Layout> {
691 pub keys: L::KeyContainer,
693 pub keys_offs: L::OffsetContainer,
702 pub times: L::TimeContainer,
704 pub diffs: L::DiffContainer,
706 }
707
708 impl<L: Layout> OrdKeyStorage<L> {
709 fn updates_for_key(&self, index: usize) -> (usize, usize) {
711 let mut lower = self.keys_offs.index(index);
712 let upper = self.keys_offs.index(index+1);
713 if lower == upper {
716 assert!(lower > 0);
717 lower -= 1;
718 }
719 (lower, upper)
720 }
721 }
722
723 #[derive(Serialize, Deserialize)]
728 #[serde(bound = "
729 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
730 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
731 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
732 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
733 ")]
734 pub struct OrdKeyBatch<L: Layout> {
735 pub storage: OrdKeyStorage<L>,
737 pub description: Description<<L::Target as Update>::Time>,
739 pub updates: usize,
745 }
746
747 impl<L: Layout> BatchReader for OrdKeyBatch<L> {
748
749 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
750 type Val<'a> = &'a ();
751 type Time = <L::Target as Update>::Time;
752 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
753 type Diff = <L::Target as Update>::Diff;
754 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
755
756 type Cursor = OrdKeyCursor<L>;
757 fn cursor(&self) -> Self::Cursor {
758 OrdKeyCursor {
759 key_cursor: 0,
760 val_stepped: false,
761 phantom: std::marker::PhantomData,
762 }
763 }
764 fn len(&self) -> usize {
765 self.updates
768 }
769 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
770 }
771
772 impl<L: Layout> Batch for OrdKeyBatch<L> {
773 type Merger = OrdKeyMerger<L>;
774
775 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
776 OrdKeyMerger::new(self, other, compaction_frontier)
777 }
778
779 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
780 use timely::progress::Timestamp;
781 Self {
782 storage: OrdKeyStorage {
783 keys: L::KeyContainer::with_capacity(0),
784 keys_offs: L::OffsetContainer::with_capacity(0),
785 times: L::TimeContainer::with_capacity(0),
786 diffs: L::DiffContainer::with_capacity(0),
787 },
788 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
789 updates: 0,
790 }
791 }
792 }
793
794 pub struct OrdKeyMerger<L: Layout> {
796 key_cursor1: usize,
798 key_cursor2: usize,
800 result: OrdKeyStorage<L>,
802 description: Description<<L::Target as Update>::Time>,
804
805 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
810 singletons: usize,
812 }
813
814 impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
815 where
816 OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>,
817 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
818 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
819 {
820 fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
821
822 assert!(batch1.upper() == batch2.lower());
823 use crate::lattice::Lattice;
824 let mut since = batch1.description().since().join(batch2.description().since());
825 since = since.join(&compaction_frontier.to_owned());
826
827 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
828
829 let batch1 = &batch1.storage;
830 let batch2 = &batch2.storage;
831
832 let mut storage = OrdKeyStorage {
833 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
834 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
835 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
836 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
837 };
838
839 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
840 keys_offs.push(0);
841
842 OrdKeyMerger {
843 key_cursor1: 0,
844 key_cursor2: 0,
845 result: storage,
846 description,
847 update_stash: Vec::new(),
848 singletons: 0,
849 }
850 }
851 fn done(self) -> OrdKeyBatch<L> {
852 OrdKeyBatch {
853 updates: self.result.times.len() + self.singletons,
854 storage: self.result,
855 description: self.description,
856 }
857 }
858 fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
859
860 let starting_updates = self.result.times.len();
862 let mut effort = 0isize;
863
864 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
866 self.merge_key(&source1.storage, &source2.storage);
867 effort = (self.result.times.len() - starting_updates) as isize;
869 }
870
871 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
874 self.copy_key(&source1.storage, self.key_cursor1);
875 self.key_cursor1 += 1;
876 effort = (self.result.times.len() - starting_updates) as isize;
877 }
878 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
879 self.copy_key(&source2.storage, self.key_cursor2);
880 self.key_cursor2 += 1;
881 effort = (self.result.times.len() - starting_updates) as isize;
882 }
883
884 *fuel -= effort;
885 }
886 }
887
888 impl<L: Layout> OrdKeyMerger<L> {
890 fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
898 self.stash_updates_for_key(source, cursor);
899 if let Some(off) = self.consolidate_updates() {
900 self.result.keys_offs.push(off);
901 self.result.keys.push(source.keys.index(cursor));
902 }
903 }
904 fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
909 use ::std::cmp::Ordering;
910 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
911 Ordering::Less => {
912 self.copy_key(source1, self.key_cursor1);
913 self.key_cursor1 += 1;
914 },
915 Ordering::Equal => {
916 self.stash_updates_for_key(source1, self.key_cursor1);
918 self.stash_updates_for_key(source2, self.key_cursor2);
919 if let Some(off) = self.consolidate_updates() {
920 self.result.keys_offs.push(off);
921 self.result.keys.push(source1.keys.index(self.key_cursor1));
922 }
923 self.key_cursor1 += 1;
925 self.key_cursor2 += 1;
926 },
927 Ordering::Greater => {
928 self.copy_key(source2, self.key_cursor2);
929 self.key_cursor2 += 1;
930 },
931 }
932 }
933
934 fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
936 let (lower, upper) = source.updates_for_key(index);
937 for i in lower .. upper {
938 let time = source.times.index(i);
940 let diff = source.diffs.index(i);
941 use crate::lattice::Lattice;
942 let mut new_time = time.into_owned();
943 new_time.advance_by(self.description.since().borrow());
944 self.update_stash.push((new_time, diff.into_owned()));
945 }
946 }
947
948 fn consolidate_updates(&mut self) -> Option<usize> {
950 use crate::consolidation;
951 consolidation::consolidate(&mut self.update_stash);
952 if !self.update_stash.is_empty() {
953 let time_diff = self.result.times.last().zip(self.result.diffs.last());
956 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
957 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
958 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
959 t1.eq(&t2) && d1.eq(&d2)
960 });
961 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
962 self.update_stash.clear();
964 self.singletons += 1;
965 }
966 else {
967 for (time, diff) in self.update_stash.drain(..) {
969 self.result.times.push(time);
970 self.result.diffs.push(diff);
971 }
972 }
973 Some(self.result.times.len())
974 } else {
975 None
976 }
977 }
978 }
979
980 pub struct OrdKeyCursor<L: Layout> {
982 key_cursor: usize,
984 val_stepped: bool,
986 phantom: PhantomData<L>,
988 }
989
990 impl<L: Layout> Cursor for OrdKeyCursor<L> {
991 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
992 type Val<'a> = &'a ();
993 type Time = <L::Target as Update>::Time;
994 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
995 type Diff = <L::Target as Update>::Diff;
996 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
997
998 type Storage = OrdKeyBatch<L>;
999
1000 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
1001 fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
1002 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1003 let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
1004 for index in lower .. upper {
1005 let time = storage.storage.times.index(index);
1006 let diff = storage.storage.diffs.index(index);
1007 logic(time, diff);
1008 }
1009 }
1010 fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1011 fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1012 fn step_key(&mut self, storage: &Self::Storage){
1013 self.key_cursor += 1;
1014 if self.key_valid(storage) {
1015 self.rewind_vals(storage);
1016 }
1017 else {
1018 self.key_cursor = storage.storage.keys.len();
1019 }
1020 }
1021 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1022 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1023 if self.key_valid(storage) {
1024 self.rewind_vals(storage);
1025 }
1026 }
1027 fn step_val(&mut self, _storage: &Self::Storage) {
1028 self.val_stepped = true;
1029 }
1030 fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1031 fn rewind_keys(&mut self, storage: &Self::Storage) {
1032 self.key_cursor = 0;
1033 if self.key_valid(storage) {
1034 self.rewind_vals(storage)
1035 }
1036 }
1037 fn rewind_vals(&mut self, _storage: &Self::Storage) {
1038 self.val_stepped = false;
1039 }
1040 }
1041
1042 pub struct OrdKeyBuilder<L: Layout, CI> {
1044 pub result: OrdKeyStorage<L>,
1048 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
1049 singletons: usize,
1054 _marker: PhantomData<CI>,
1055 }
1056
1057 impl<L: Layout, CI> OrdKeyBuilder<L, CI> {
1058 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
1070 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
1072 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
1073 if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
1074 assert!(self.singleton.is_none());
1075 self.singleton = Some((time, diff));
1076 }
1077 else {
1078 if let Some((time, diff)) = self.singleton.take() {
1080 self.result.times.push(time);
1081 self.result.diffs.push(diff);
1082 }
1083 self.result.times.push(time);
1084 self.result.diffs.push(diff);
1085 }
1086 }
1087 }
1088
1089 impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1090 where
1091 L: Layout,
1092 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
1093 for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
1094 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
1095 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
1096 {
1097
1098 type Input = CI;
1099 type Time = <L::Target as Update>::Time;
1100 type Output = OrdKeyBatch<L>;
1101
1102 fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1103 Self {
1105 result: OrdKeyStorage {
1106 keys: L::KeyContainer::with_capacity(keys),
1107 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
1108 times: L::TimeContainer::with_capacity(upds),
1109 diffs: L::DiffContainer::with_capacity(upds),
1110 },
1111 singleton: None,
1112 singletons: 0,
1113 _marker: PhantomData,
1114 }
1115 }
1116
1117 #[inline]
1118 fn push(&mut self, chunk: &mut Self::Input) {
1119 for item in chunk.drain() {
1120 let (key, _val, time, diff) = CI::into_parts(item);
1121 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1123 self.push_update(time, diff);
1124 } else {
1125 self.result.keys_offs.push(self.result.times.len());
1127 if self.singleton.take().is_some() { self.singletons += 1; }
1129 self.push_update(time, diff);
1130 self.result.keys.push(key);
1131 }
1132 }
1133 }
1134
1135 #[inline(never)]
1136 fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1137 self.result.keys_offs.push(self.result.times.len());
1139 if self.singleton.take().is_some() { self.singletons += 1; }
1141 OrdKeyBatch {
1142 updates: self.result.times.len() + self.singletons,
1143 storage: self.result,
1144 description,
1145 }
1146 }
1147
1148 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1149 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1150 let mut builder = Self::with_capacity(keys, vals, upds);
1151 for mut chunk in chain.drain(..) {
1152 builder.push(&mut chunk);
1153 }
1154
1155 builder.done(description)
1156 }
1157 }
1158
1159}