differential_dataflow/trace/implementations/
mod.rs1pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod ord_neu;
45pub mod chunker;
46
47pub use self::chunker::ContainerChunker;
49pub use self::ord_neu::OrdValSpine as ValSpine;
50pub use self::ord_neu::OrdValBatcher as ValBatcher;
51pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
52pub use self::ord_neu::OrdKeySpine as KeySpine;
53pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
54pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
55
56use std::convert::TryInto;
57
58use serde::{Deserialize, Serialize};
59use timely::container::{DrainContainer, PushInto};
60use timely::progress::Timestamp;
61
62use crate::lattice::Lattice;
63use crate::difference::Semigroup;
64
65pub trait Update {
67 type Key: Ord + Clone + 'static;
69 type Val: Ord + Clone + 'static;
71 type Time: Lattice + timely::progress::Timestamp;
73 type Diff: Ord + Semigroup + 'static;
75}
76
77impl<K,V,T,R> Update for ((K, V), T, R)
78where
79 K: Ord+Clone+'static,
80 V: Ord+Clone+'static,
81 T: Lattice+timely::progress::Timestamp,
82 R: Ord+Semigroup+'static,
83{
84 type Key = K;
85 type Val = V;
86 type Time = T;
87 type Diff = R;
88}
89
90pub trait Layout {
92 type KeyContainer: BatchContainer;
94 type ValContainer: BatchContainer;
96 type TimeContainer: BatchContainer<Owned: Lattice + timely::progress::Timestamp>;
98 type DiffContainer: BatchContainer<Owned: Semigroup + 'static>;
100 type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
102}
103
104pub trait WithLayout {
106 type Layout: Layout;
108}
109
110pub trait LayoutExt : WithLayout<Layout: Layout<KeyContainer = Self::KeyContainer, ValContainer = Self::ValContainer, TimeContainer = Self::TimeContainer, DiffContainer = Self::DiffContainer>> {
112 type Key<'a>: Copy + Ord;
114 type ValOwn: Clone + Ord;
116 type Val<'a>: Copy + Ord;
118 type Time: Lattice + timely::progress::Timestamp;
120 type TimeGat<'a>: Copy + Ord;
122 type Diff: Semigroup + 'static;
124 type DiffGat<'a>: Copy + Ord;
126
127 type KeyContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Key<'a>>;
129 type ValContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Val<'a>, Owned = Self::ValOwn>;
131 type TimeContainer: for<'a> BatchContainer<ReadItem<'a> = Self::TimeGat<'a>, Owned = Self::Time>;
133 type DiffContainer: for<'a> BatchContainer<ReadItem<'a> = Self::DiffGat<'a>, Owned = Self::Diff>;
135
136 fn owned_val(val: Self::Val<'_>) -> Self::ValOwn;
138 fn owned_time(time: Self::TimeGat<'_>) -> Self::Time;
140 fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff;
142
143 fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time);
145}
146
147impl<L: WithLayout> LayoutExt for L {
148 type Key<'a> = <<L::Layout as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
149 type ValOwn = <<L::Layout as Layout>::ValContainer as BatchContainer>::Owned;
150 type Val<'a> = <<L::Layout as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
151 type Time = <<L::Layout as Layout>::TimeContainer as BatchContainer>::Owned;
152 type TimeGat<'a> = <<L::Layout as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
153 type Diff = <<L::Layout as Layout>::DiffContainer as BatchContainer>::Owned;
154 type DiffGat<'a> = <<L::Layout as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
155
156 type KeyContainer = <L::Layout as Layout>::KeyContainer;
157 type ValContainer = <L::Layout as Layout>::ValContainer;
158 type TimeContainer = <L::Layout as Layout>::TimeContainer;
159 type DiffContainer = <L::Layout as Layout>::DiffContainer;
160
161 #[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { <Self::Layout as Layout>::ValContainer::into_owned(val) }
162 #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { <Self::Layout as Layout>::TimeContainer::into_owned(time) }
163 #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { <Self::Layout as Layout>::DiffContainer::into_owned(diff) }
164 #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { <Self::Layout as Layout>::TimeContainer::clone_onto(time, onto) }
165
166}
167
168impl<KC, VC, TC, DC, OC> Layout for (KC, VC, TC, DC, OC)
171where
172 KC: BatchContainer,
173 VC: BatchContainer,
174 TC: BatchContainer<Owned: Lattice + timely::progress::Timestamp>,
175 DC: BatchContainer<Owned: Semigroup>,
176 OC: for<'a> BatchContainer<ReadItem<'a> = usize>,
177{
178 type KeyContainer = KC;
179 type ValContainer = VC;
180 type TimeContainer = TC;
181 type DiffContainer = DC;
182 type OffsetContainer = OC;
183}
184
185pub mod layout {
189 use crate::trace::implementations::{BatchContainer, Layout};
190
191 pub type Key<L> = <<L as Layout>::KeyContainer as BatchContainer>::Owned;
193 pub type KeyRef<'a, L> = <<L as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
195 pub type Val<L> = <<L as Layout>::ValContainer as BatchContainer>::Owned;
197 pub type ValRef<'a, L> = <<L as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
199 pub type Time<L> = <<L as Layout>::TimeContainer as BatchContainer>::Owned;
201 pub type TimeRef<'a, L> = <<L as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
203 pub type Diff<L> = <<L as Layout>::DiffContainer as BatchContainer>::Owned;
205 pub type DiffRef<'a, L> = <<L as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
207}
208
209pub struct Vector<U: Update> {
211 phantom: std::marker::PhantomData<U>,
212}
213
214impl<U: Update<Diff: Ord>> Layout for Vector<U> {
215 type KeyContainer = Vec<U::Key>;
216 type ValContainer = Vec<U::Val>;
217 type TimeContainer = Vec<U::Time>;
218 type DiffContainer = Vec<U::Diff>;
219 type OffsetContainer = OffsetList;
220}
221
222#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
224pub struct OffsetList {
225 pub zero_prefix: usize,
227 pub smol: Vec<u32>,
229 pub chonk: Vec<u64>,
231}
232
233impl std::fmt::Debug for OffsetList {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 f.debug_list().entries(self.into_iter()).finish()
236 }
237}
238
239impl OffsetList {
240 pub fn with_capacity(cap: usize) -> Self {
242 Self {
243 zero_prefix: 0,
244 smol: Vec::with_capacity(cap),
245 chonk: Vec::new(),
246 }
247 }
248 pub fn push(&mut self, offset: usize) {
250 if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
251 self.zero_prefix += 1;
252 }
253 else if self.chonk.is_empty() {
254 if let Ok(smol) = offset.try_into() {
255 self.smol.push(smol);
256 }
257 else {
258 self.chonk.push(offset.try_into().unwrap())
259 }
260 }
261 else {
262 self.chonk.push(offset.try_into().unwrap())
263 }
264 }
265 pub fn index(&self, index: usize) -> usize {
267 if index < self.zero_prefix {
268 0
269 }
270 else if index - self.zero_prefix < self.smol.len() {
271 self.smol[index - self.zero_prefix].try_into().unwrap()
272 }
273 else {
274 self.chonk[index - self.zero_prefix - self.smol.len()].try_into().unwrap()
275 }
276 }
277 pub fn len(&self) -> usize {
279 self.zero_prefix + self.smol.len() + self.chonk.len()
280 }
281}
282
283impl<'a> IntoIterator for &'a OffsetList {
284 type Item = usize;
285 type IntoIter = OffsetListIter<'a>;
286
287 fn into_iter(self) -> Self::IntoIter {
288 OffsetListIter {list: self, index: 0 }
289 }
290}
291
292pub struct OffsetListIter<'a> {
294 list: &'a OffsetList,
295 index: usize,
296}
297
298impl<'a> Iterator for OffsetListIter<'a> {
299 type Item = usize;
300
301 fn next(&mut self) -> Option<Self::Item> {
302 if self.index < self.list.len() {
303 let res = Some(self.list.index(self.index));
304 self.index += 1;
305 res
306 } else {
307 None
308 }
309 }
310}
311
312impl PushInto<usize> for OffsetList {
313 fn push_into(&mut self, item: usize) {
314 self.push(item);
315 }
316}
317
318impl BatchContainer for OffsetList {
319 type Owned = usize;
320 type ReadItem<'a> = usize;
321
322 #[inline(always)]
323 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item }
324 #[inline(always)]
325 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
326
327 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
328 fn push_own(&mut self, item: &Self::Owned) { self.push_into(*item) }
329
330 fn clear(&mut self) { self.zero_prefix = 0; self.smol.clear(); self.chonk.clear(); }
331
332 fn with_capacity(size: usize) -> Self {
333 Self::with_capacity(size)
334 }
335
336 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
337 Self::with_capacity(cont1.len() + cont2.len())
338 }
339
340 fn index(&self, index: usize) -> Self::ReadItem<'_> {
341 self.index(index)
342 }
343
344 fn len(&self) -> usize {
345 self.len()
346 }
347}
348
349pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: DrainContainer + Sized {
351 type Key<'a>: Ord;
353 type Val<'a>: Ord;
355 type Time;
357 type Diff;
359
360 fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
362
363 fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
365
366 fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
368
369 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
371}
372
373impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
374where
375 K: Ord + Clone + 'static,
376 KBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a K>>,
377 V: Ord + Clone + 'static,
378 VBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a V>>,
379 T: Timestamp + Lattice + 'static,
380 R: Ord + Semigroup + 'static,
381{
382 type Key<'a> = K;
383 type Val<'a> = V;
384 type Time = T;
385 type Diff = R;
386
387 fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
388 (key, val, time, diff)
389 }
390
391 fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
392 KBC::reborrow(other) == this
393 }
394
395 fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
396 VBC::reborrow(other) == this
397 }
398
399 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
400 let mut keys = 0;
401 let mut vals = 0;
402 let mut upds = 0;
403 let mut prev_keyval = None;
404 for link in chain.iter() {
405 for ((key, val), _, _) in link.iter() {
406 if let Some((p_key, p_val)) = prev_keyval {
407 if p_key != key {
408 keys += 1;
409 vals += 1;
410 } else if p_val != val {
411 vals += 1;
412 }
413 } else {
414 keys += 1;
415 vals += 1;
416 }
417 upds += 1;
418 prev_keyval = Some((key, val));
419 }
420 }
421 (keys, vals, upds)
422 }
423}
424
425pub use self::containers::{BatchContainer, SliceContainer};
426
427pub mod containers {
429
430 use timely::container::PushInto;
431
432 pub trait BatchContainer: 'static {
434 type Owned: Clone + Ord;
436
437 type ReadItem<'a>: Copy + Ord;
439
440
441 #[must_use]
443 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned;
444 #[inline(always)]
446 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
447 *other = Self::into_owned(item);
448 }
449
450 fn push_ref(&mut self, item: Self::ReadItem<'_>);
452 fn push_own(&mut self, item: &Self::Owned);
454
455 fn clear(&mut self);
457
458 fn with_capacity(size: usize) -> Self;
460 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
462
463 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
465
466 fn index(&self, index: usize) -> Self::ReadItem<'_>;
468
469 fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
471 if index < self.len() {
472 Some(self.index(index))
473 }
474 else { None }
475 }
476
477 fn len(&self) -> usize;
479 fn last(&self) -> Option<Self::ReadItem<'_>> {
481 if self.len() > 0 {
482 Some(self.index(self.len()-1))
483 }
484 else {
485 None
486 }
487 }
488 fn is_empty(&self) -> bool { self.len() == 0 }
490
491 fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
498
499 let small_limit = 8;
500
501 if end > start + small_limit && function(self.index(start + small_limit)) {
503
504 let mut index = small_limit + 1;
506 if start + index < end && function(self.index(start + index)) {
507
508 let mut step = 1;
510 while start + index + step < end && function(self.index(start + index + step)) {
511 index += step;
512 step <<= 1;
513 }
514
515 step >>= 1;
517 while step > 0 {
518 if start + index + step < end && function(self.index(start + index + step)) {
519 index += step;
520 }
521 step >>= 1;
522 }
523
524 index += 1;
525 }
526
527 index
528 }
529 else {
530 let limit = std::cmp::min(end, start + small_limit);
531 (start .. limit).filter(|x| function(self.index(*x))).count()
532 }
533 }
534 }
535
536 impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
539 type Owned = T;
540 type ReadItem<'a> = &'a T;
541
542 #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
543 #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
544
545 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
546
547 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
548 fn push_own(&mut self, item: &Self::Owned) { self.push_into(item.clone()) }
549
550 fn clear(&mut self) { self.clear() }
551
552 fn with_capacity(size: usize) -> Self {
553 Vec::with_capacity(size)
554 }
555 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
556 Vec::with_capacity(cont1.len() + cont2.len())
557 }
558 fn index(&self, index: usize) -> Self::ReadItem<'_> {
559 &self[index]
560 }
561 fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
562 <[T]>::get(self, index)
563 }
564 fn len(&self) -> usize {
565 self[..].len()
566 }
567 }
568
569 pub struct SliceContainer<B> {
571 offsets: Vec<usize>,
576 inner: Vec<B>,
578 }
579
580 impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
581 fn push_into(&mut self, item: &[B]) {
582 for x in item.iter() {
583 self.inner.push_into(x);
584 }
585 self.offsets.push(self.inner.len());
586 }
587 }
588
589 impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
590 fn push_into(&mut self, item: &Vec<B>) {
591 self.push_into(&item[..]);
592 }
593 }
594
595 impl<B> BatchContainer for SliceContainer<B>
596 where
597 B: Ord + Clone + Sized + 'static,
598 {
599 type Owned = Vec<B>;
600 type ReadItem<'a> = &'a [B];
601
602 #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.to_vec() }
603 #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from_slice(item); }
604
605 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
606
607 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
608 fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
609
610 fn clear(&mut self) {
611 self.offsets.clear();
612 self.offsets.push(0);
613 self.inner.clear();
614 }
615
616 fn with_capacity(size: usize) -> Self {
617 let mut offsets = Vec::with_capacity(size + 1);
618 offsets.push(0);
619 Self {
620 offsets,
621 inner: Vec::with_capacity(size),
622 }
623 }
624 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
625 let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
626 offsets.push(0);
627 Self {
628 offsets,
629 inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
630 }
631 }
632 fn index(&self, index: usize) -> Self::ReadItem<'_> {
633 let lower = self.offsets[index];
634 let upper = self.offsets[index+1];
635 &self.inner[lower .. upper]
636 }
637 fn len(&self) -> usize {
638 self.offsets.len() - 1
639 }
640 }
641
642 impl<B> Default for SliceContainer<B> {
644 fn default() -> Self {
645 Self {
646 offsets: vec![0],
647 inner: Default::default(),
648 }
649 }
650 }
651}