1pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod ord_neu;
45pub mod rhh;
46pub mod huffman_container;
47pub mod chunker;
48
49pub use self::ord_neu::OrdValSpine as ValSpine;
51pub use self::ord_neu::OrdValBatcher as ValBatcher;
52pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
53pub use self::ord_neu::OrdKeySpine as KeySpine;
54pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
55pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
56
57use std::borrow::{ToOwned};
58use std::convert::TryInto;
59
60use columnation::Columnation;
61use serde::{Deserialize, Serialize};
62use timely::Container;
63use timely::container::PushInto;
64use timely::progress::Timestamp;
65
66use crate::containers::TimelyStack;
67use crate::lattice::Lattice;
68use crate::difference::Semigroup;
69
70pub trait Update {
72 type Key: Ord + Clone + 'static;
74 type Val: Ord + Clone + 'static;
76 type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
78 type Diff: Ord + Semigroup + 'static;
80}
81
82impl<K,V,T,R> Update for ((K, V), T, R)
83where
84 K: Ord+Clone+'static,
85 V: Ord+Clone+'static,
86 T: Ord+Clone+Lattice+timely::progress::Timestamp,
87 R: Ord+Semigroup+'static,
88{
89 type Key = K;
90 type Val = V;
91 type Time = T;
92 type Diff = R;
93}
94
95pub trait Layout {
97 type Target: Update + ?Sized;
99 type KeyContainer: BatchContainer + PushInto<<Self::Target as Update>::Key>;
102 type ValContainer: BatchContainer;
104 type TimeContainer: BatchContainer<Owned = <Self::Target as Update>::Time> + PushInto<<Self::Target as Update>::Time>;
106 type DiffContainer: BatchContainer<Owned = <Self::Target as Update>::Diff> + PushInto<<Self::Target as Update>::Diff>;
108 type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
110}
111
112pub struct Vector<U: Update> {
114 phantom: std::marker::PhantomData<U>,
115}
116
117impl<U: Update> Layout for Vector<U>
118where
119 U::Diff: Ord,
120{
121 type Target = U;
122 type KeyContainer = Vec<U::Key>;
123 type ValContainer = Vec<U::Val>;
124 type TimeContainer = Vec<U::Time>;
125 type DiffContainer = Vec<U::Diff>;
126 type OffsetContainer = OffsetList;
127}
128
129pub struct TStack<U: Update> {
131 phantom: std::marker::PhantomData<U>,
132}
133
134impl<U: Update> Layout for TStack<U>
135where
136 U::Key: Columnation,
137 U::Val: Columnation,
138 U::Time: Columnation,
139 U::Diff: Columnation + Ord,
140{
141 type Target = U;
142 type KeyContainer = TimelyStack<U::Key>;
143 type ValContainer = TimelyStack<U::Val>;
144 type TimeContainer = TimelyStack<U::Time>;
145 type DiffContainer = TimelyStack<U::Diff>;
146 type OffsetContainer = OffsetList;
147}
148
149pub trait PreferredContainer : ToOwned {
153 type Container: BatchContainer + PushInto<Self::Owned>;
155}
156
157impl<T: Ord + Clone + 'static> PreferredContainer for T {
158 type Container = Vec<T>;
159}
160
161impl<T: Ord + Clone + 'static> PreferredContainer for [T] {
162 type Container = SliceContainer<T>;
163}
164
165pub struct Preferred<K: ?Sized, V: ?Sized, T, D> {
167 phantom: std::marker::PhantomData<(Box<K>, Box<V>, T, D)>,
168}
169
170impl<K,V,T,R> Update for Preferred<K, V, T, R>
171where
172 K: ToOwned + ?Sized,
173 K::Owned: Ord+Clone+'static,
174 V: ToOwned + ?Sized,
175 V::Owned: Ord+Clone+'static,
176 T: Ord+Clone+Lattice+timely::progress::Timestamp,
177 R: Ord+Clone+Semigroup+'static,
178{
179 type Key = K::Owned;
180 type Val = V::Owned;
181 type Time = T;
182 type Diff = R;
183}
184
185impl<K, V, T, D> Layout for Preferred<K, V, T, D>
186where
187 K: Ord+ToOwned+PreferredContainer + ?Sized,
188 K::Owned: Ord+Clone+'static,
189 V: Ord+ToOwned+PreferredContainer + ?Sized,
190 V::Owned: Ord+Clone+'static,
191 T: Ord+Clone+Lattice+timely::progress::Timestamp,
192 D: Ord+Clone+Semigroup+'static,
193{
194 type Target = Preferred<K, V, T, D>;
195 type KeyContainer = K::Container;
196 type ValContainer = V::Container;
197 type TimeContainer = Vec<T>;
198 type DiffContainer = Vec<D>;
199 type OffsetContainer = OffsetList;
200}
201
202#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
204pub struct OffsetList {
205 pub zero_prefix: usize,
207 pub smol: Vec<u32>,
209 pub chonk: Vec<u64>,
211}
212
213impl std::fmt::Debug for OffsetList {
214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 f.debug_list().entries(self.into_iter()).finish()
216 }
217}
218
219impl OffsetList {
220 pub fn with_capacity(cap: usize) -> Self {
222 Self {
223 zero_prefix: 0,
224 smol: Vec::with_capacity(cap),
225 chonk: Vec::new(),
226 }
227 }
228 pub fn push(&mut self, offset: usize) {
230 if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
231 self.zero_prefix += 1;
232 }
233 else if self.chonk.is_empty() {
234 if let Ok(smol) = offset.try_into() {
235 self.smol.push(smol);
236 }
237 else {
238 self.chonk.push(offset.try_into().unwrap())
239 }
240 }
241 else {
242 self.chonk.push(offset.try_into().unwrap())
243 }
244 }
245 pub fn index(&self, index: usize) -> usize {
247 if index < self.zero_prefix {
248 0
249 }
250 else if index - self.zero_prefix < self.smol.len() {
251 self.smol[index - self.zero_prefix].try_into().unwrap()
252 }
253 else {
254 self.chonk[index - self.zero_prefix - self.smol.len()].try_into().unwrap()
255 }
256 }
257 pub fn len(&self) -> usize {
259 self.zero_prefix + self.smol.len() + self.chonk.len()
260 }
261}
262
263impl<'a> IntoIterator for &'a OffsetList {
264 type Item = usize;
265 type IntoIter = OffsetListIter<'a>;
266
267 fn into_iter(self) -> Self::IntoIter {
268 OffsetListIter {list: self, index: 0 }
269 }
270}
271
272pub struct OffsetListIter<'a> {
274 list: &'a OffsetList,
275 index: usize,
276}
277
278impl<'a> Iterator for OffsetListIter<'a> {
279 type Item = usize;
280
281 fn next(&mut self) -> Option<Self::Item> {
282 if self.index < self.list.len() {
283 let res = Some(self.list.index(self.index));
284 self.index += 1;
285 res
286 } else {
287 None
288 }
289 }
290}
291
292impl PushInto<usize> for OffsetList {
293 fn push_into(&mut self, item: usize) {
294 self.push(item);
295 }
296}
297
298impl BatchContainer for OffsetList {
299 type Owned = usize;
300 type ReadItem<'a> = usize;
301
302 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
303
304 fn with_capacity(size: usize) -> Self {
305 Self::with_capacity(size)
306 }
307
308 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
309 Self::with_capacity(cont1.len() + cont2.len())
310 }
311
312 fn index(&self, index: usize) -> Self::ReadItem<'_> {
313 self.index(index)
314 }
315
316 fn len(&self) -> usize {
317 self.len()
318 }
319}
320
321pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: Container {
323 type Key<'a>: Ord;
325 type Val<'a>: Ord;
327 type Time;
329 type Diff;
331
332 fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
334
335 fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
337
338 fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
340
341 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
343}
344
345impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
346where
347 K: Ord + Clone + 'static,
348 KBC: BatchContainer,
349 for<'a> KBC::ReadItem<'a>: PartialEq<&'a K>,
350 V: Ord + Clone + 'static,
351 VBC: BatchContainer,
352 for<'a> VBC::ReadItem<'a>: PartialEq<&'a V>,
353 T: Timestamp + Lattice + Clone + 'static,
354 R: Ord + Semigroup + 'static,
355{
356 type Key<'a> = K;
357 type Val<'a> = V;
358 type Time = T;
359 type Diff = R;
360
361 fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
362 (key, val, time, diff)
363 }
364
365 fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
366 KBC::reborrow(other) == this
367 }
368
369 fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
370 VBC::reborrow(other) == this
371 }
372
373 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
374 let mut keys = 0;
375 let mut vals = 0;
376 let mut upds = 0;
377 let mut prev_keyval = None;
378 for link in chain.iter() {
379 for ((key, val), _, _) in link.iter() {
380 if let Some((p_key, p_val)) = prev_keyval {
381 if p_key != key {
382 keys += 1;
383 vals += 1;
384 } else if p_val != val {
385 vals += 1;
386 }
387 } else {
388 keys += 1;
389 vals += 1;
390 }
391 upds += 1;
392 prev_keyval = Some((key, val));
393 }
394 }
395 (keys, vals, upds)
396 }
397}
398
399impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
400where
401 K: BatchContainer,
402 for<'a> K::ReadItem<'a>: PartialEq<&'a K::Owned>,
403 K::Owned: Ord + Columnation + Clone + 'static,
404 V: BatchContainer,
405 for<'a> V::ReadItem<'a>: PartialEq<&'a V::Owned>,
406 V::Owned: Ord + Columnation + Clone + 'static,
407 T: Timestamp + Lattice + Columnation + Clone + 'static,
408 R: Ord + Clone + Semigroup + Columnation + 'static,
409{
410 type Key<'a> = &'a K::Owned;
411 type Val<'a> = &'a V::Owned;
412 type Time = T;
413 type Diff = R;
414
415 fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
416 (key, val, time.clone(), diff.clone())
417 }
418
419 fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
420 K::reborrow(other) == *this
421 }
422
423 fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
424 V::reborrow(other) == *this
425 }
426
427 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
428 let mut keys = 0;
429 let mut vals = 0;
430 let mut upds = 0;
431 let mut prev_keyval = None;
432 for link in chain.iter() {
433 for ((key, val), _, _) in link.iter() {
434 if let Some((p_key, p_val)) = prev_keyval {
435 if p_key != key {
436 keys += 1;
437 vals += 1;
438 } else if p_val != val {
439 vals += 1;
440 }
441 } else {
442 keys += 1;
443 vals += 1;
444 }
445 upds += 1;
446 prev_keyval = Some((key, val));
447 }
448 }
449 (keys, vals, upds)
450 }
451}
452
453pub use self::containers::{BatchContainer, SliceContainer};
454
455pub mod containers {
457
458 use columnation::Columnation;
459 use timely::container::PushInto;
460
461 use crate::containers::TimelyStack;
462 use crate::IntoOwned;
463
464 pub trait BatchContainer: for<'a> PushInto<Self::ReadItem<'a>> + 'static {
466 type Owned;
468
469 type ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned = Self::Owned>;
471
472 fn push<D>(&mut self, item: D) where Self: PushInto<D> {
474 self.push_into(item);
475 }
476 fn with_capacity(size: usize) -> Self;
478 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
480
481 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
483
484 fn index(&self, index: usize) -> Self::ReadItem<'_>;
486
487 fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
489 if index < self.len() {
490 Some(self.index(index))
491 }
492 else { None }
493 }
494
495 fn len(&self) -> usize;
497 fn last(&self) -> Option<Self::ReadItem<'_>> {
499 if self.len() > 0 {
500 Some(self.index(self.len()-1))
501 }
502 else {
503 None
504 }
505 }
506 fn is_empty(&self) -> bool { self.len() == 0 }
508
509 fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
516
517 let small_limit = 8;
518
519 if end > start + small_limit && function(self.index(start + small_limit)) {
521
522 let mut index = small_limit + 1;
524 if start + index < end && function(self.index(start + index)) {
525
526 let mut step = 1;
528 while start + index + step < end && function(self.index(start + index + step)) {
529 index += step;
530 step <<= 1;
531 }
532
533 step >>= 1;
535 while step > 0 {
536 if start + index + step < end && function(self.index(start + index + step)) {
537 index += step;
538 }
539 step >>= 1;
540 }
541
542 index += 1;
543 }
544
545 index
546 }
547 else {
548 let limit = std::cmp::min(end, start + small_limit);
549 (start .. limit).filter(|x| function(self.index(*x))).count()
550 }
551 }
552 }
553
554 impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
557 type Owned = T;
558 type ReadItem<'a> = &'a T;
559
560 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
561
562 fn with_capacity(size: usize) -> Self {
563 Vec::with_capacity(size)
564 }
565 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
566 Vec::with_capacity(cont1.len() + cont2.len())
567 }
568 fn index(&self, index: usize) -> Self::ReadItem<'_> {
569 &self[index]
570 }
571 fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
572 <[T]>::get(&self, index)
573 }
574 fn len(&self) -> usize {
575 self[..].len()
576 }
577 }
578
579 impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
582 type Owned = T;
583 type ReadItem<'a> = &'a T;
584
585 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
586
587 fn with_capacity(size: usize) -> Self {
588 Self::with_capacity(size)
589 }
590 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
591 let mut new = Self::default();
592 new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
593 new
594 }
595 fn index(&self, index: usize) -> Self::ReadItem<'_> {
596 &self[index]
597 }
598 fn len(&self) -> usize {
599 self[..].len()
600 }
601 }
602
603 pub struct SliceContainer<B> {
605 offsets: Vec<usize>,
610 inner: Vec<B>,
612 }
613
614 impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
615 fn push_into(&mut self, item: &[B]) {
616 for x in item.iter() {
617 self.inner.push_into(x);
618 }
619 self.offsets.push(self.inner.len());
620 }
621 }
622
623 impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
624 fn push_into(&mut self, item: &Vec<B>) {
625 self.push_into(&item[..]);
626 }
627 }
628
629 impl<B> PushInto<Vec<B>> for SliceContainer<B> {
630 fn push_into(&mut self, item: Vec<B>) {
631 for x in item.into_iter() {
632 self.inner.push(x);
633 }
634 self.offsets.push(self.inner.len());
635 }
636 }
637
638 impl<B> BatchContainer for SliceContainer<B>
639 where
640 B: Ord + Clone + Sized + 'static,
641 {
642 type Owned = Vec<B>;
643 type ReadItem<'a> = &'a [B];
644
645 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
646
647 fn with_capacity(size: usize) -> Self {
648 let mut offsets = Vec::with_capacity(size + 1);
649 offsets.push(0);
650 Self {
651 offsets,
652 inner: Vec::with_capacity(size),
653 }
654 }
655 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
656 let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
657 offsets.push(0);
658 Self {
659 offsets,
660 inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
661 }
662 }
663 fn index(&self, index: usize) -> Self::ReadItem<'_> {
664 let lower = self.offsets[index];
665 let upper = self.offsets[index+1];
666 &self.inner[lower .. upper]
667 }
668 fn len(&self) -> usize {
669 self.offsets.len() - 1
670 }
671 }
672
673 impl<B> Default for SliceContainer<B> {
675 fn default() -> Self {
676 Self {
677 offsets: vec![0],
678 inner: Default::default(),
679 }
680 }
681 }
682}