1use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Update, Layout, Vector, TStack, Preferred};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
36pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
43pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
45pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
47
48pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
53pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
55pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
57
58pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
60pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>>;
62pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;
64
65mod val_batch {
70
71 use std::marker::PhantomData;
72 use serde::{Deserialize, Serialize};
73 use timely::container::PushInto;
74 use timely::progress::{Antichain, frontier::AntichainRef};
75
76 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
77 use crate::trace::implementations::{BatchContainer, BuilderInput};
78 use crate::IntoOwned;
79
80 use super::{Layout, Update};
81
82 #[derive(Debug, Serialize, Deserialize)]
84 pub struct OrdValStorage<L: Layout> {
85 pub keys: L::KeyContainer,
87 pub keys_offs: L::OffsetContainer,
91 pub vals: L::ValContainer,
93 pub vals_offs: L::OffsetContainer,
102 pub times: L::TimeContainer,
104 pub diffs: L::DiffContainer,
106 }
107
108 impl<L: Layout> OrdValStorage<L> {
109 fn values_for_key(&self, index: usize) -> (usize, usize) {
111 (self.keys_offs.index(index), self.keys_offs.index(index+1))
112 }
113 fn updates_for_value(&self, index: usize) -> (usize, usize) {
115 let mut lower = self.vals_offs.index(index);
116 let upper = self.vals_offs.index(index+1);
117 if lower == upper {
120 assert!(lower > 0);
121 lower -= 1;
122 }
123 (lower, upper)
124 }
125 }
126
127 #[derive(Serialize, Deserialize)]
132 #[serde(bound = "
133 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
134 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
135 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
136 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
137 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
138 ")]
139 pub struct OrdValBatch<L: Layout> {
140 pub storage: OrdValStorage<L>,
142 pub description: Description<<L::Target as Update>::Time>,
144 pub updates: usize,
150 }
151
152 impl<L: Layout> BatchReader for OrdValBatch<L> {
153 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
154 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
155 type Time = <L::Target as Update>::Time;
156 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
157 type Diff = <L::Target as Update>::Diff;
158 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
159
160 type Cursor = OrdValCursor<L>;
161 fn cursor(&self) -> Self::Cursor {
162 OrdValCursor {
163 key_cursor: 0,
164 val_cursor: 0,
165 phantom: PhantomData,
166 }
167 }
168 fn len(&self) -> usize {
169 self.updates
172 }
173 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
174 }
175
176 impl<L: Layout> Batch for OrdValBatch<L> {
177 type Merger = OrdValMerger<L>;
178
179 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
180 OrdValMerger::new(self, other, compaction_frontier)
181 }
182
183 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
184 use timely::progress::Timestamp;
185 Self {
186 storage: OrdValStorage {
187 keys: L::KeyContainer::with_capacity(0),
188 keys_offs: L::OffsetContainer::with_capacity(0),
189 vals: L::ValContainer::with_capacity(0),
190 vals_offs: L::OffsetContainer::with_capacity(0),
191 times: L::TimeContainer::with_capacity(0),
192 diffs: L::DiffContainer::with_capacity(0),
193 },
194 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
195 updates: 0,
196 }
197 }
198 }
199
200 pub struct OrdValMerger<L: Layout> {
202 key_cursor1: usize,
204 key_cursor2: usize,
206 result: OrdValStorage<L>,
208 description: Description<<L::Target as Update>::Time>,
210
211 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
216 singletons: usize,
218 }
219
220 impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
221 where
222 OrdValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
223 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
224 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
225 {
226 fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
227
228 assert!(batch1.upper() == batch2.lower());
229 use crate::lattice::Lattice;
230 let mut since = batch1.description().since().join(batch2.description().since());
231 since = since.join(&compaction_frontier.to_owned());
232
233 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
234
235 let batch1 = &batch1.storage;
236 let batch2 = &batch2.storage;
237
238 let mut storage = OrdValStorage {
239 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
240 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
241 vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
242 vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
243 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
244 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
245 };
246
247 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
249 keys_offs.push(0);
250 let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
251 vals_offs.push(0);
252
253 OrdValMerger {
254 key_cursor1: 0,
255 key_cursor2: 0,
256 result: storage,
257 description,
258 update_stash: Vec::new(),
259 singletons: 0,
260 }
261 }
262 fn done(self) -> OrdValBatch<L> {
263 OrdValBatch {
264 updates: self.result.times.len() + self.singletons,
265 storage: self.result,
266 description: self.description,
267 }
268 }
269 fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
270
271 let starting_updates = self.result.times.len();
273 let mut effort = 0isize;
274
275 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
277 self.merge_key(&source1.storage, &source2.storage);
278 effort = (self.result.times.len() - starting_updates) as isize;
280 }
281
282 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
285 self.copy_key(&source1.storage, self.key_cursor1);
286 self.key_cursor1 += 1;
287 effort = (self.result.times.len() - starting_updates) as isize;
288 }
289 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
290 self.copy_key(&source2.storage, self.key_cursor2);
291 self.key_cursor2 += 1;
292 effort = (self.result.times.len() - starting_updates) as isize;
293 }
294
295 *fuel -= effort;
296 }
297 }
298
299 impl<L: Layout> OrdValMerger<L> {
301 fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
309 let init_vals = self.result.vals.len();
311 let (mut lower, upper) = source.values_for_key(cursor);
312 while lower < upper {
313 self.stash_updates_for_val(source, lower);
314 if let Some(off) = self.consolidate_updates() {
315 self.result.vals_offs.push(off);
316 self.result.vals.push(source.vals.index(lower));
317 }
318 lower += 1;
319 }
320
321 if self.result.vals.len() > init_vals {
323 self.result.keys.push(source.keys.index(cursor));
324 self.result.keys_offs.push(self.result.vals.len());
325 }
326 }
327 fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
332 use ::std::cmp::Ordering;
333 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
334 Ordering::Less => {
335 self.copy_key(source1, self.key_cursor1);
336 self.key_cursor1 += 1;
337 },
338 Ordering::Equal => {
339 let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
341 let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
342 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
343 self.result.keys.push(source1.keys.index(self.key_cursor1));
344 self.result.keys_offs.push(off);
345 }
346 self.key_cursor1 += 1;
348 self.key_cursor2 += 1;
349 },
350 Ordering::Greater => {
351 self.copy_key(source2, self.key_cursor2);
352 self.key_cursor2 += 1;
353 },
354 }
355 }
356 fn merge_vals(
361 &mut self,
362 (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
363 (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
364 ) -> Option<usize> {
365 let init_vals = self.result.vals.len();
367 while lower1 < upper1 && lower2 < upper2 {
368 use ::std::cmp::Ordering;
372 match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
373 Ordering::Less => {
374 self.stash_updates_for_val(source1, lower1);
376 if let Some(off) = self.consolidate_updates() {
377 self.result.vals_offs.push(off);
378 self.result.vals.push(source1.vals.index(lower1));
379 }
380 lower1 += 1;
381 },
382 Ordering::Equal => {
383 self.stash_updates_for_val(source1, lower1);
384 self.stash_updates_for_val(source2, lower2);
385 if let Some(off) = self.consolidate_updates() {
386 self.result.vals_offs.push(off);
387 self.result.vals.push(source1.vals.index(lower1));
388 }
389 lower1 += 1;
390 lower2 += 1;
391 },
392 Ordering::Greater => {
393 self.stash_updates_for_val(source2, lower2);
395 if let Some(off) = self.consolidate_updates() {
396 self.result.vals_offs.push(off);
397 self.result.vals.push(source2.vals.index(lower2));
398 }
399 lower2 += 1;
400 },
401 }
402 }
403 while lower1 < upper1 {
405 self.stash_updates_for_val(source1, lower1);
406 if let Some(off) = self.consolidate_updates() {
407 self.result.vals_offs.push(off);
408 self.result.vals.push(source1.vals.index(lower1));
409 }
410 lower1 += 1;
411 }
412 while lower2 < upper2 {
413 self.stash_updates_for_val(source2, lower2);
414 if let Some(off) = self.consolidate_updates() {
415 self.result.vals_offs.push(off);
416 self.result.vals.push(source2.vals.index(lower2));
417 }
418 lower2 += 1;
419 }
420
421 if self.result.vals.len() > init_vals {
423 Some(self.result.vals.len())
424 } else {
425 None
426 }
427 }
428
429 fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
431 let (lower, upper) = source.updates_for_value(index);
432 for i in lower .. upper {
433 let time = source.times.index(i);
435 let diff = source.diffs.index(i);
436 use crate::lattice::Lattice;
437 let mut new_time: <L::Target as Update>::Time = time.into_owned();
438 new_time.advance_by(self.description.since().borrow());
439 self.update_stash.push((new_time, diff.into_owned()));
440 }
441 }
442
443 fn consolidate_updates(&mut self) -> Option<usize> {
445 use crate::consolidation;
446 consolidation::consolidate(&mut self.update_stash);
447 if !self.update_stash.is_empty() {
448 let time_diff = self.result.times.last().zip(self.result.diffs.last());
451 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
452 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
453 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
454 t1.eq(&t2) && d1.eq(&d2)
455 });
456 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
457 self.update_stash.clear();
459 self.singletons += 1;
460 }
461 else {
462 for (time, diff) in self.update_stash.drain(..) {
464 self.result.times.push(time);
465 self.result.diffs.push(diff);
466 }
467 }
468 Some(self.result.times.len())
469 } else {
470 None
471 }
472 }
473 }
474
475 pub struct OrdValCursor<L: Layout> {
477 key_cursor: usize,
479 val_cursor: usize,
481 phantom: PhantomData<L>,
483 }
484
485 impl<L: Layout> Cursor for OrdValCursor<L> {
486
487 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
488 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
489 type Time = <L::Target as Update>::Time;
490 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
491 type Diff = <L::Target as Update>::Diff;
492 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
493
494 type Storage = OrdValBatch<L>;
495
496 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
497 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
498
499 fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
500 fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
501 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
502 let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
503 for index in lower .. upper {
504 let time = storage.storage.times.index(index);
505 let diff = storage.storage.diffs.index(index);
506 logic(time, diff);
507 }
508 }
509 fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
510 fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
511 fn step_key(&mut self, storage: &OrdValBatch<L>){
512 self.key_cursor += 1;
513 if self.key_valid(storage) {
514 self.rewind_vals(storage);
515 }
516 else {
517 self.key_cursor = storage.storage.keys.len();
518 }
519 }
520 fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
521 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
522 if self.key_valid(storage) {
523 self.rewind_vals(storage);
524 }
525 }
526 fn step_val(&mut self, storage: &OrdValBatch<L>) {
527 self.val_cursor += 1;
528 if !self.val_valid(storage) {
529 self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
530 }
531 }
532 fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
533 self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
534 }
535 fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
536 self.key_cursor = 0;
537 if self.key_valid(storage) {
538 self.rewind_vals(storage)
539 }
540 }
541 fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
542 self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
543 }
544 }
545
546 pub struct OrdValBuilder<L: Layout, CI> {
548 pub result: OrdValStorage<L>,
552 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
553 singletons: usize,
558 _marker: PhantomData<CI>,
559 }
560
561 impl<L: Layout, CI> OrdValBuilder<L, CI> {
562 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
574 if self.result.times.last().map(|t| t == <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) &&
576 self.result.diffs.last().map(|d| d == <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true)
577 {
578 assert!(self.singleton.is_none());
579 self.singleton = Some((time, diff));
580 }
581 else {
582 if let Some((time, diff)) = self.singleton.take() {
584 self.result.times.push(time);
585 self.result.diffs.push(diff);
586 }
587 self.result.times.push(time);
588 self.result.diffs.push(diff);
589 }
590 }
591 }
592
593 impl<L, CI> Builder for OrdValBuilder<L, CI>
594 where
595 L: Layout,
596 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
597 for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
598 for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
599 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
600 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
601 {
602
603 type Input = CI;
604 type Time = <L::Target as Update>::Time;
605 type Output = OrdValBatch<L>;
606
607 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
608 Self {
610 result: OrdValStorage {
611 keys: L::KeyContainer::with_capacity(keys),
612 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
613 vals: L::ValContainer::with_capacity(vals),
614 vals_offs: L::OffsetContainer::with_capacity(vals + 1),
615 times: L::TimeContainer::with_capacity(upds),
616 diffs: L::DiffContainer::with_capacity(upds),
617 },
618 singleton: None,
619 singletons: 0,
620 _marker: PhantomData,
621 }
622 }
623
624 #[inline]
625 fn push(&mut self, chunk: &mut Self::Input) {
626 for item in chunk.drain() {
627 let (key, val, time, diff) = CI::into_parts(item);
628 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
630 if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
632 self.push_update(time, diff);
633 } else {
634 self.result.vals_offs.push(self.result.times.len());
636 if self.singleton.take().is_some() { self.singletons += 1; }
637 self.push_update(time, diff);
638 self.result.vals.push(val);
639 }
640 } else {
641 self.result.vals_offs.push(self.result.times.len());
643 if self.singleton.take().is_some() { self.singletons += 1; }
644 self.result.keys_offs.push(self.result.vals.len());
645 self.push_update(time, diff);
646 self.result.vals.push(val);
647 self.result.keys.push(key);
648 }
649 }
650 }
651
652 #[inline(never)]
653 fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
654 self.result.vals_offs.push(self.result.times.len());
656 if self.singleton.take().is_some() { self.singletons += 1; }
658 self.result.keys_offs.push(self.result.vals.len());
659 OrdValBatch {
660 updates: self.result.times.len() + self.singletons,
661 storage: self.result,
662 description,
663 }
664 }
665
666 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
667 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
668 let mut builder = Self::with_capacity(keys, vals, upds);
669 for mut chunk in chain.drain(..) {
670 builder.push(&mut chunk);
671 }
672
673 builder.done(description)
674 }
675 }
676}
677
678mod key_batch {
679
680 use std::marker::PhantomData;
681 use serde::{Deserialize, Serialize};
682 use timely::container::PushInto;
683 use timely::progress::{Antichain, frontier::AntichainRef};
684
685 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
686 use crate::trace::implementations::{BatchContainer, BuilderInput};
687 use crate::IntoOwned;
688
689 use super::{Layout, Update};
690
691 #[derive(Debug, Serialize, Deserialize)]
693 pub struct OrdKeyStorage<L: Layout> {
694 pub keys: L::KeyContainer,
696 pub keys_offs: L::OffsetContainer,
705 pub times: L::TimeContainer,
707 pub diffs: L::DiffContainer,
709 }
710
711 impl<L: Layout> OrdKeyStorage<L> {
712 fn updates_for_key(&self, index: usize) -> (usize, usize) {
714 let mut lower = self.keys_offs.index(index);
715 let upper = self.keys_offs.index(index+1);
716 if lower == upper {
719 assert!(lower > 0);
720 lower -= 1;
721 }
722 (lower, upper)
723 }
724 }
725
726 #[derive(Serialize, Deserialize)]
731 #[serde(bound = "
732 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
733 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
734 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
735 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
736 ")]
737 pub struct OrdKeyBatch<L: Layout> {
738 pub storage: OrdKeyStorage<L>,
740 pub description: Description<<L::Target as Update>::Time>,
742 pub updates: usize,
748 }
749
750 impl<L: Layout> BatchReader for OrdKeyBatch<L> {
751
752 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
753 type Val<'a> = &'a ();
754 type Time = <L::Target as Update>::Time;
755 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
756 type Diff = <L::Target as Update>::Diff;
757 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
758
759 type Cursor = OrdKeyCursor<L>;
760 fn cursor(&self) -> Self::Cursor {
761 OrdKeyCursor {
762 key_cursor: 0,
763 val_stepped: false,
764 phantom: std::marker::PhantomData,
765 }
766 }
767 fn len(&self) -> usize {
768 self.updates
771 }
772 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
773 }
774
775 impl<L: Layout> Batch for OrdKeyBatch<L> {
776 type Merger = OrdKeyMerger<L>;
777
778 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
779 OrdKeyMerger::new(self, other, compaction_frontier)
780 }
781
782 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
783 use timely::progress::Timestamp;
784 Self {
785 storage: OrdKeyStorage {
786 keys: L::KeyContainer::with_capacity(0),
787 keys_offs: L::OffsetContainer::with_capacity(0),
788 times: L::TimeContainer::with_capacity(0),
789 diffs: L::DiffContainer::with_capacity(0),
790 },
791 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
792 updates: 0,
793 }
794 }
795 }
796
797 pub struct OrdKeyMerger<L: Layout> {
799 key_cursor1: usize,
801 key_cursor2: usize,
803 result: OrdKeyStorage<L>,
805 description: Description<<L::Target as Update>::Time>,
807
808 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
813 singletons: usize,
815 }
816
817 impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
818 where
819 OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>,
820 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
821 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
822 {
823 fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
824
825 assert!(batch1.upper() == batch2.lower());
826 use crate::lattice::Lattice;
827 let mut since = batch1.description().since().join(batch2.description().since());
828 since = since.join(&compaction_frontier.to_owned());
829
830 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
831
832 let batch1 = &batch1.storage;
833 let batch2 = &batch2.storage;
834
835 let mut storage = OrdKeyStorage {
836 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
837 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
838 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
839 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
840 };
841
842 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
843 keys_offs.push(0);
844
845 OrdKeyMerger {
846 key_cursor1: 0,
847 key_cursor2: 0,
848 result: storage,
849 description,
850 update_stash: Vec::new(),
851 singletons: 0,
852 }
853 }
854 fn done(self) -> OrdKeyBatch<L> {
855 OrdKeyBatch {
856 updates: self.result.times.len() + self.singletons,
857 storage: self.result,
858 description: self.description,
859 }
860 }
861 fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
862
863 let starting_updates = self.result.times.len();
865 let mut effort = 0isize;
866
867 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
869 self.merge_key(&source1.storage, &source2.storage);
870 effort = (self.result.times.len() - starting_updates) as isize;
872 }
873
874 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
877 self.copy_key(&source1.storage, self.key_cursor1);
878 self.key_cursor1 += 1;
879 effort = (self.result.times.len() - starting_updates) as isize;
880 }
881 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
882 self.copy_key(&source2.storage, self.key_cursor2);
883 self.key_cursor2 += 1;
884 effort = (self.result.times.len() - starting_updates) as isize;
885 }
886
887 *fuel -= effort;
888 }
889 }
890
891 impl<L: Layout> OrdKeyMerger<L> {
893 fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
901 self.stash_updates_for_key(source, cursor);
902 if let Some(off) = self.consolidate_updates() {
903 self.result.keys_offs.push(off);
904 self.result.keys.push(source.keys.index(cursor));
905 }
906 }
907 fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
912 use ::std::cmp::Ordering;
913 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
914 Ordering::Less => {
915 self.copy_key(source1, self.key_cursor1);
916 self.key_cursor1 += 1;
917 },
918 Ordering::Equal => {
919 self.stash_updates_for_key(source1, self.key_cursor1);
921 self.stash_updates_for_key(source2, self.key_cursor2);
922 if let Some(off) = self.consolidate_updates() {
923 self.result.keys_offs.push(off);
924 self.result.keys.push(source1.keys.index(self.key_cursor1));
925 }
926 self.key_cursor1 += 1;
928 self.key_cursor2 += 1;
929 },
930 Ordering::Greater => {
931 self.copy_key(source2, self.key_cursor2);
932 self.key_cursor2 += 1;
933 },
934 }
935 }
936
937 fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
939 let (lower, upper) = source.updates_for_key(index);
940 for i in lower .. upper {
941 let time = source.times.index(i);
943 let diff = source.diffs.index(i);
944 use crate::lattice::Lattice;
945 let mut new_time = time.into_owned();
946 new_time.advance_by(self.description.since().borrow());
947 self.update_stash.push((new_time, diff.into_owned()));
948 }
949 }
950
951 fn consolidate_updates(&mut self) -> Option<usize> {
953 use crate::consolidation;
954 consolidation::consolidate(&mut self.update_stash);
955 if !self.update_stash.is_empty() {
956 let time_diff = self.result.times.last().zip(self.result.diffs.last());
959 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
960 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
961 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
962 t1.eq(&t2) && d1.eq(&d2)
963 });
964 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
965 self.update_stash.clear();
967 self.singletons += 1;
968 }
969 else {
970 for (time, diff) in self.update_stash.drain(..) {
972 self.result.times.push(time);
973 self.result.diffs.push(diff);
974 }
975 }
976 Some(self.result.times.len())
977 } else {
978 None
979 }
980 }
981 }
982
983 pub struct OrdKeyCursor<L: Layout> {
985 key_cursor: usize,
987 val_stepped: bool,
989 phantom: PhantomData<L>,
991 }
992
993 impl<L: Layout> Cursor for OrdKeyCursor<L> {
994 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
995 type Val<'a> = &'a ();
996 type Time = <L::Target as Update>::Time;
997 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
998 type Diff = <L::Target as Update>::Diff;
999 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
1000
1001 type Storage = OrdKeyBatch<L>;
1002
1003 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
1004 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a ()> { if self.val_valid(storage) { Some(&()) } else { None } }
1005
1006 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
1007 fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
1008 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1009 let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
1010 for index in lower .. upper {
1011 let time = storage.storage.times.index(index);
1012 let diff = storage.storage.diffs.index(index);
1013 logic(time, diff);
1014 }
1015 }
1016 fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1017 fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1018 fn step_key(&mut self, storage: &Self::Storage){
1019 self.key_cursor += 1;
1020 if self.key_valid(storage) {
1021 self.rewind_vals(storage);
1022 }
1023 else {
1024 self.key_cursor = storage.storage.keys.len();
1025 }
1026 }
1027 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1028 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1029 if self.key_valid(storage) {
1030 self.rewind_vals(storage);
1031 }
1032 }
1033 fn step_val(&mut self, _storage: &Self::Storage) {
1034 self.val_stepped = true;
1035 }
1036 fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1037 fn rewind_keys(&mut self, storage: &Self::Storage) {
1038 self.key_cursor = 0;
1039 if self.key_valid(storage) {
1040 self.rewind_vals(storage)
1041 }
1042 }
1043 fn rewind_vals(&mut self, _storage: &Self::Storage) {
1044 self.val_stepped = false;
1045 }
1046 }
1047
1048 pub struct OrdKeyBuilder<L: Layout, CI> {
1050 pub result: OrdKeyStorage<L>,
1054 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
1055 singletons: usize,
1060 _marker: PhantomData<CI>,
1061 }
1062
1063 impl<L: Layout, CI> OrdKeyBuilder<L, CI> {
1064 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
1076 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
1078 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
1079 if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
1080 assert!(self.singleton.is_none());
1081 self.singleton = Some((time, diff));
1082 }
1083 else {
1084 if let Some((time, diff)) = self.singleton.take() {
1086 self.result.times.push(time);
1087 self.result.diffs.push(diff);
1088 }
1089 self.result.times.push(time);
1090 self.result.diffs.push(diff);
1091 }
1092 }
1093 }
1094
1095 impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1096 where
1097 L: Layout,
1098 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
1099 for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
1100 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
1101 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
1102 {
1103
1104 type Input = CI;
1105 type Time = <L::Target as Update>::Time;
1106 type Output = OrdKeyBatch<L>;
1107
1108 fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1109 Self {
1111 result: OrdKeyStorage {
1112 keys: L::KeyContainer::with_capacity(keys),
1113 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
1114 times: L::TimeContainer::with_capacity(upds),
1115 diffs: L::DiffContainer::with_capacity(upds),
1116 },
1117 singleton: None,
1118 singletons: 0,
1119 _marker: PhantomData,
1120 }
1121 }
1122
1123 #[inline]
1124 fn push(&mut self, chunk: &mut Self::Input) {
1125 for item in chunk.drain() {
1126 let (key, _val, time, diff) = CI::into_parts(item);
1127 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1129 self.push_update(time, diff);
1130 } else {
1131 self.result.keys_offs.push(self.result.times.len());
1133 if self.singleton.take().is_some() { self.singletons += 1; }
1135 self.push_update(time, diff);
1136 self.result.keys.push(key);
1137 }
1138 }
1139 }
1140
1141 #[inline(never)]
1142 fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1143 self.result.keys_offs.push(self.result.times.len());
1145 if self.singleton.take().is_some() { self.singletons += 1; }
1147 OrdKeyBatch {
1148 updates: self.result.times.len() + self.singletons,
1149 storage: self.result,
1150 description,
1151 }
1152 }
1153
1154 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1155 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1156 let mut builder = Self::with_capacity(keys, vals, upds);
1157 for mut chunk in chain.drain(..) {
1158 builder.push(&mut chunk);
1159 }
1160
1161 builder.done(description)
1162 }
1163 }
1164
1165}