1pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod ord_neu;
45pub mod rhh;
46pub mod huffman_container;
47pub mod chunker;
48
49pub use self::ord_neu::OrdValSpine as ValSpine;
51pub use self::ord_neu::OrdValBatcher as ValBatcher;
52pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
53pub use self::ord_neu::OrdKeySpine as KeySpine;
54pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
55pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
56
57use std::convert::TryInto;
58
59use columnation::Columnation;
60use serde::{Deserialize, Serialize};
61use timely::container::{DrainContainer, PushInto};
62use timely::progress::Timestamp;
63
64use crate::containers::TimelyStack;
65use crate::lattice::Lattice;
66use crate::difference::Semigroup;
67
68pub trait Update {
70 type Key: Ord + Clone + 'static;
72 type Val: Ord + Clone + 'static;
74 type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
76 type Diff: Ord + Semigroup + 'static;
78}
79
80impl<K,V,T,R> Update for ((K, V), T, R)
81where
82 K: Ord+Clone+'static,
83 V: Ord+Clone+'static,
84 T: Ord+Clone+Lattice+timely::progress::Timestamp,
85 R: Ord+Semigroup+'static,
86{
87 type Key = K;
88 type Val = V;
89 type Time = T;
90 type Diff = R;
91}
92
93pub trait Layout {
95 type KeyContainer: BatchContainer;
97 type ValContainer: BatchContainer;
99 type TimeContainer: BatchContainer<Owned: Lattice + timely::progress::Timestamp>;
101 type DiffContainer: BatchContainer<Owned: Semigroup + 'static>;
103 type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
105}
106
107pub trait WithLayout {
109 type Layout: Layout;
111}
112
113pub trait LayoutExt : WithLayout<Layout: Layout<KeyContainer = Self::KeyContainer, ValContainer = Self::ValContainer, TimeContainer = Self::TimeContainer, DiffContainer = Self::DiffContainer>> {
115 type KeyOwn;
117 type Key<'a>: Copy + Ord;
119 type ValOwn: Clone + Ord;
121 type Val<'a>: Copy + Ord;
123 type Time: Lattice + timely::progress::Timestamp;
125 type TimeGat<'a>: Copy + Ord;
127 type Diff: Semigroup + 'static;
129 type DiffGat<'a>: Copy + Ord;
131
132 type KeyContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Key<'a>, Owned = Self::KeyOwn>;
134 type ValContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Val<'a>, Owned = Self::ValOwn>;
136 type TimeContainer: for<'a> BatchContainer<ReadItem<'a> = Self::TimeGat<'a>, Owned = Self::Time>;
138 type DiffContainer: for<'a> BatchContainer<ReadItem<'a> = Self::DiffGat<'a>, Owned = Self::Diff>;
140
141 fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn;
143 fn owned_val(val: Self::Val<'_>) -> Self::ValOwn;
145 fn owned_time(time: Self::TimeGat<'_>) -> Self::Time;
147 fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff;
149
150 fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time);
152}
153
154impl<L: WithLayout> LayoutExt for L {
155 type KeyOwn = <<L::Layout as Layout>::KeyContainer as BatchContainer>::Owned;
156 type Key<'a> = <<L::Layout as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
157 type ValOwn = <<L::Layout as Layout>::ValContainer as BatchContainer>::Owned;
158 type Val<'a> = <<L::Layout as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
159 type Time = <<L::Layout as Layout>::TimeContainer as BatchContainer>::Owned;
160 type TimeGat<'a> = <<L::Layout as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
161 type Diff = <<L::Layout as Layout>::DiffContainer as BatchContainer>::Owned;
162 type DiffGat<'a> = <<L::Layout as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
163
164 type KeyContainer = <L::Layout as Layout>::KeyContainer;
165 type ValContainer = <L::Layout as Layout>::ValContainer;
166 type TimeContainer = <L::Layout as Layout>::TimeContainer;
167 type DiffContainer = <L::Layout as Layout>::DiffContainer;
168
169 #[inline(always)] fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn { <Self::Layout as Layout>::KeyContainer::into_owned(key) }
170 #[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { <Self::Layout as Layout>::ValContainer::into_owned(val) }
171 #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { <Self::Layout as Layout>::TimeContainer::into_owned(time) }
172 #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { <Self::Layout as Layout>::DiffContainer::into_owned(diff) }
173 #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { <Self::Layout as Layout>::TimeContainer::clone_onto(time, onto) }
174
175}
176
177impl<KC, VC, TC, DC, OC> Layout for (KC, VC, TC, DC, OC)
180where
181 KC: BatchContainer,
182 VC: BatchContainer,
183 TC: BatchContainer<Owned: Lattice + timely::progress::Timestamp>,
184 DC: BatchContainer<Owned: Semigroup>,
185 OC: for<'a> BatchContainer<ReadItem<'a> = usize>,
186{
187 type KeyContainer = KC;
188 type ValContainer = VC;
189 type TimeContainer = TC;
190 type DiffContainer = DC;
191 type OffsetContainer = OC;
192}
193
194pub mod layout {
198 use crate::trace::implementations::{BatchContainer, Layout};
199
200 pub type Key<L> = <<L as Layout>::KeyContainer as BatchContainer>::Owned;
202 pub type KeyRef<'a, L> = <<L as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
204 pub type Val<L> = <<L as Layout>::ValContainer as BatchContainer>::Owned;
206 pub type ValRef<'a, L> = <<L as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
208 pub type Time<L> = <<L as Layout>::TimeContainer as BatchContainer>::Owned;
210 pub type TimeRef<'a, L> = <<L as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
212 pub type Diff<L> = <<L as Layout>::DiffContainer as BatchContainer>::Owned;
214 pub type DiffRef<'a, L> = <<L as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
216}
217
218pub struct Vector<U: Update> {
220 phantom: std::marker::PhantomData<U>,
221}
222
223impl<U: Update<Diff: Ord>> Layout for Vector<U> {
224 type KeyContainer = Vec<U::Key>;
225 type ValContainer = Vec<U::Val>;
226 type TimeContainer = Vec<U::Time>;
227 type DiffContainer = Vec<U::Diff>;
228 type OffsetContainer = OffsetList;
229}
230
231pub struct TStack<U: Update> {
233 phantom: std::marker::PhantomData<U>,
234}
235
236impl<U> Layout for TStack<U>
237where
238 U: Update<
239 Key: Columnation,
240 Val: Columnation,
241 Time: Columnation,
242 Diff: Columnation + Ord,
243 >,
244{
245 type KeyContainer = TimelyStack<U::Key>;
246 type ValContainer = TimelyStack<U::Val>;
247 type TimeContainer = TimelyStack<U::Time>;
248 type DiffContainer = TimelyStack<U::Diff>;
249 type OffsetContainer = OffsetList;
250}
251
252#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
254pub struct OffsetList {
255 pub zero_prefix: usize,
257 pub smol: Vec<u32>,
259 pub chonk: Vec<u64>,
261}
262
263impl std::fmt::Debug for OffsetList {
264 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265 f.debug_list().entries(self.into_iter()).finish()
266 }
267}
268
269impl OffsetList {
270 pub fn with_capacity(cap: usize) -> Self {
272 Self {
273 zero_prefix: 0,
274 smol: Vec::with_capacity(cap),
275 chonk: Vec::new(),
276 }
277 }
278 pub fn push(&mut self, offset: usize) {
280 if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
281 self.zero_prefix += 1;
282 }
283 else if self.chonk.is_empty() {
284 if let Ok(smol) = offset.try_into() {
285 self.smol.push(smol);
286 }
287 else {
288 self.chonk.push(offset.try_into().unwrap())
289 }
290 }
291 else {
292 self.chonk.push(offset.try_into().unwrap())
293 }
294 }
295 pub fn index(&self, index: usize) -> usize {
297 if index < self.zero_prefix {
298 0
299 }
300 else if index - self.zero_prefix < self.smol.len() {
301 self.smol[index - self.zero_prefix].try_into().unwrap()
302 }
303 else {
304 self.chonk[index - self.zero_prefix - self.smol.len()].try_into().unwrap()
305 }
306 }
307 pub fn len(&self) -> usize {
309 self.zero_prefix + self.smol.len() + self.chonk.len()
310 }
311}
312
313impl<'a> IntoIterator for &'a OffsetList {
314 type Item = usize;
315 type IntoIter = OffsetListIter<'a>;
316
317 fn into_iter(self) -> Self::IntoIter {
318 OffsetListIter {list: self, index: 0 }
319 }
320}
321
322pub struct OffsetListIter<'a> {
324 list: &'a OffsetList,
325 index: usize,
326}
327
328impl<'a> Iterator for OffsetListIter<'a> {
329 type Item = usize;
330
331 fn next(&mut self) -> Option<Self::Item> {
332 if self.index < self.list.len() {
333 let res = Some(self.list.index(self.index));
334 self.index += 1;
335 res
336 } else {
337 None
338 }
339 }
340}
341
342impl PushInto<usize> for OffsetList {
343 fn push_into(&mut self, item: usize) {
344 self.push(item);
345 }
346}
347
348impl BatchContainer for OffsetList {
349 type Owned = usize;
350 type ReadItem<'a> = usize;
351
352 #[inline(always)]
353 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item }
354 #[inline(always)]
355 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
356
357 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
358 fn push_own(&mut self, item: &Self::Owned) { self.push_into(*item) }
359
360 fn clear(&mut self) { self.zero_prefix = 0; self.smol.clear(); self.chonk.clear(); }
361
362 fn with_capacity(size: usize) -> Self {
363 Self::with_capacity(size)
364 }
365
366 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
367 Self::with_capacity(cont1.len() + cont2.len())
368 }
369
370 fn index(&self, index: usize) -> Self::ReadItem<'_> {
371 self.index(index)
372 }
373
374 fn len(&self) -> usize {
375 self.len()
376 }
377}
378
379pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: DrainContainer + Sized {
381 type Key<'a>: Ord;
383 type Val<'a>: Ord;
385 type Time;
387 type Diff;
389
390 fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
392
393 fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
395
396 fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
398
399 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
401}
402
403impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
404where
405 K: Ord + Clone + 'static,
406 KBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a K>>,
407 V: Ord + Clone + 'static,
408 VBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a V>>,
409 T: Timestamp + Lattice + Clone + 'static,
410 R: Ord + Semigroup + 'static,
411{
412 type Key<'a> = K;
413 type Val<'a> = V;
414 type Time = T;
415 type Diff = R;
416
417 fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
418 (key, val, time, diff)
419 }
420
421 fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
422 KBC::reborrow(other) == this
423 }
424
425 fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
426 VBC::reborrow(other) == this
427 }
428
429 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
430 let mut keys = 0;
431 let mut vals = 0;
432 let mut upds = 0;
433 let mut prev_keyval = None;
434 for link in chain.iter() {
435 for ((key, val), _, _) in link.iter() {
436 if let Some((p_key, p_val)) = prev_keyval {
437 if p_key != key {
438 keys += 1;
439 vals += 1;
440 } else if p_val != val {
441 vals += 1;
442 }
443 } else {
444 keys += 1;
445 vals += 1;
446 }
447 upds += 1;
448 prev_keyval = Some((key, val));
449 }
450 }
451 (keys, vals, upds)
452 }
453}
454
455impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
456where
457 K: for<'a> BatchContainer<
458 ReadItem<'a>: PartialEq<&'a K::Owned>,
459 Owned: Ord + Columnation + Clone + 'static,
460 >,
461 V: for<'a> BatchContainer<
462 ReadItem<'a>: PartialEq<&'a V::Owned>,
463 Owned: Ord + Columnation + Clone + 'static,
464 >,
465 T: Timestamp + Lattice + Columnation + Clone + 'static,
466 R: Ord + Clone + Semigroup + Columnation + 'static,
467{
468 type Key<'a> = &'a K::Owned;
469 type Val<'a> = &'a V::Owned;
470 type Time = T;
471 type Diff = R;
472
473 fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
474 (key, val, time.clone(), diff.clone())
475 }
476
477 fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
478 K::reborrow(other) == *this
479 }
480
481 fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
482 V::reborrow(other) == *this
483 }
484
485 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
486 let mut keys = 0;
487 let mut vals = 0;
488 let mut upds = 0;
489 let mut prev_keyval = None;
490 for link in chain.iter() {
491 for ((key, val), _, _) in link.iter() {
492 if let Some((p_key, p_val)) = prev_keyval {
493 if p_key != key {
494 keys += 1;
495 vals += 1;
496 } else if p_val != val {
497 vals += 1;
498 }
499 } else {
500 keys += 1;
501 vals += 1;
502 }
503 upds += 1;
504 prev_keyval = Some((key, val));
505 }
506 }
507 (keys, vals, upds)
508 }
509}
510
511pub use self::containers::{BatchContainer, SliceContainer};
512
513pub mod containers {
515
516 use columnation::Columnation;
517 use timely::container::PushInto;
518
519 use crate::containers::TimelyStack;
520
521 pub trait BatchContainer: 'static {
523 type Owned: Clone + Ord;
525
526 type ReadItem<'a>: Copy + Ord;
528
529
530 #[must_use]
532 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned;
533 #[inline(always)]
535 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
536 *other = Self::into_owned(item);
537 }
538
539 fn push_ref(&mut self, item: Self::ReadItem<'_>);
541 fn push_own(&mut self, item: &Self::Owned);
543
544 fn clear(&mut self);
546
547 fn with_capacity(size: usize) -> Self;
549 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
551
552 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
554
555 fn index(&self, index: usize) -> Self::ReadItem<'_>;
557
558 fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
560 if index < self.len() {
561 Some(self.index(index))
562 }
563 else { None }
564 }
565
566 fn len(&self) -> usize;
568 fn last(&self) -> Option<Self::ReadItem<'_>> {
570 if self.len() > 0 {
571 Some(self.index(self.len()-1))
572 }
573 else {
574 None
575 }
576 }
577 fn is_empty(&self) -> bool { self.len() == 0 }
579
580 fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
587
588 let small_limit = 8;
589
590 if end > start + small_limit && function(self.index(start + small_limit)) {
592
593 let mut index = small_limit + 1;
595 if start + index < end && function(self.index(start + index)) {
596
597 let mut step = 1;
599 while start + index + step < end && function(self.index(start + index + step)) {
600 index += step;
601 step <<= 1;
602 }
603
604 step >>= 1;
606 while step > 0 {
607 if start + index + step < end && function(self.index(start + index + step)) {
608 index += step;
609 }
610 step >>= 1;
611 }
612
613 index += 1;
614 }
615
616 index
617 }
618 else {
619 let limit = std::cmp::min(end, start + small_limit);
620 (start .. limit).filter(|x| function(self.index(*x))).count()
621 }
622 }
623 }
624
625 impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
628 type Owned = T;
629 type ReadItem<'a> = &'a T;
630
631 #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
632 #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
633
634 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
635
636 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
637 fn push_own(&mut self, item: &Self::Owned) { self.push_into(item.clone()) }
638
639 fn clear(&mut self) { self.clear() }
640
641 fn with_capacity(size: usize) -> Self {
642 Vec::with_capacity(size)
643 }
644 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
645 Vec::with_capacity(cont1.len() + cont2.len())
646 }
647 fn index(&self, index: usize) -> Self::ReadItem<'_> {
648 &self[index]
649 }
650 fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
651 <[T]>::get(&self, index)
652 }
653 fn len(&self) -> usize {
654 self[..].len()
655 }
656 }
657
658 impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
661 type Owned = T;
662 type ReadItem<'a> = &'a T;
663
664 #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
665 #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
666
667 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
668
669 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
670 fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
671
672 fn clear(&mut self) { self.clear() }
673
674 fn with_capacity(size: usize) -> Self {
675 Self::with_capacity(size)
676 }
677 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
678 let mut new = Self::default();
679 new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
680 new
681 }
682 fn index(&self, index: usize) -> Self::ReadItem<'_> {
683 &self[index]
684 }
685 fn len(&self) -> usize {
686 self[..].len()
687 }
688 }
689
690 pub struct SliceContainer<B> {
692 offsets: Vec<usize>,
697 inner: Vec<B>,
699 }
700
701 impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
702 fn push_into(&mut self, item: &[B]) {
703 for x in item.iter() {
704 self.inner.push_into(x);
705 }
706 self.offsets.push(self.inner.len());
707 }
708 }
709
710 impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
711 fn push_into(&mut self, item: &Vec<B>) {
712 self.push_into(&item[..]);
713 }
714 }
715
716 impl<B> BatchContainer for SliceContainer<B>
717 where
718 B: Ord + Clone + Sized + 'static,
719 {
720 type Owned = Vec<B>;
721 type ReadItem<'a> = &'a [B];
722
723 #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.to_vec() }
724 #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from_slice(item); }
725
726 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
727
728 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
729 fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
730
731 fn clear(&mut self) {
732 self.offsets.clear();
733 self.offsets.push(0);
734 self.inner.clear();
735 }
736
737 fn with_capacity(size: usize) -> Self {
738 let mut offsets = Vec::with_capacity(size + 1);
739 offsets.push(0);
740 Self {
741 offsets,
742 inner: Vec::with_capacity(size),
743 }
744 }
745 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
746 let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
747 offsets.push(0);
748 Self {
749 offsets,
750 inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
751 }
752 }
753 fn index(&self, index: usize) -> Self::ReadItem<'_> {
754 let lower = self.offsets[index];
755 let upper = self.offsets[index+1];
756 &self.inner[lower .. upper]
757 }
758 fn len(&self) -> usize {
759 self.offsets.len() - 1
760 }
761 }
762
763 impl<B> Default for SliceContainer<B> {
765 fn default() -> Self {
766 Self {
767 offsets: vec![0],
768 inner: Default::default(),
769 }
770 }
771 }
772}