1pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod merge_batcher_col;
45
46pub use self::merge_batcher::MergeBatcher as Batcher;
47
48pub mod ord_neu;
49pub mod rhh;
50pub mod huffman_container;
51
52pub use self::ord_neu::OrdValSpine as ValSpine;
54pub use self::ord_neu::OrdKeySpine as KeySpine;
55
56use std::borrow::{ToOwned};
57
58use timely::container::columnation::{Columnation, TimelyStack};
59use crate::lattice::Lattice;
60use crate::difference::Semigroup;
61
62pub trait Update {
64 type Key: Ord + Clone + 'static;
66 type Val: Ord + Clone + 'static;
68 type Time: Ord+Lattice+timely::progress::Timestamp+Clone;
70 type Diff: Semigroup+Clone;
72}
73
74impl<K,V,T,R> Update for ((K, V), T, R)
75where
76 K: Ord+Clone+'static,
77 V: Ord+Clone+'static,
78 T: Ord+Lattice+timely::progress::Timestamp+Clone,
79 R: Semigroup+Clone,
80{
81 type Key = K;
82 type Val = V;
83 type Time = T;
84 type Diff = R;
85}
86
87pub trait Layout {
89 type Target: Update + ?Sized;
91 type KeyContainer:
93 BatchContainer<PushItem=<Self::Target as Update>::Key>;
94 type ValContainer:
96 BatchContainer<PushItem=<Self::Target as Update>::Val>;
97 type UpdContainer:
99 for<'a> BatchContainer<PushItem=(<Self::Target as Update>::Time, <Self::Target as Update>::Diff), ReadItem<'a> = &'a (<Self::Target as Update>::Time, <Self::Target as Update>::Diff)>;
100}
101
102pub struct Vector<U: Update> {
104 phantom: std::marker::PhantomData<U>,
105}
106
107impl<U: Update> Layout for Vector<U>
108where
109 U::Key: 'static,
110 U::Val: 'static,
111{
112 type Target = U;
113 type KeyContainer = Vec<U::Key>;
114 type ValContainer = Vec<U::Val>;
115 type UpdContainer = Vec<(U::Time, U::Diff)>;
116}
117
118pub struct TStack<U: Update> {
120 phantom: std::marker::PhantomData<U>,
121}
122
123impl<U: Update> Layout for TStack<U>
124where
125 U::Key: Columnation + 'static,
126 U::Val: Columnation + 'static,
127 U::Time: Columnation,
128 U::Diff: Columnation,
129{
130 type Target = U;
131 type KeyContainer = TimelyStack<U::Key>;
132 type ValContainer = TimelyStack<U::Val>;
133 type UpdContainer = TimelyStack<(U::Time, U::Diff)>;
134}
135
136pub trait PreferredContainer : ToOwned {
140 type Container: BatchContainer<PushItem=Self::Owned>;
142}
143
144impl<T: Ord + Clone + 'static> PreferredContainer for T {
145 type Container = Vec<T>;
146}
147
148impl<T: Ord + Clone + 'static> PreferredContainer for [T] {
149 type Container = SliceContainer2<T>;
150}
151
152pub struct Preferred<K: ?Sized, V: ?Sized, T, D> {
154 phantom: std::marker::PhantomData<(Box<K>, Box<V>, T, D)>,
155}
156
157impl<K,V,T,R> Update for Preferred<K, V, T, R>
158where
159 K: ToOwned + ?Sized,
160 K::Owned: Ord+Clone+'static,
161 V: ToOwned + ?Sized + 'static,
162 V::Owned: Ord+Clone,
163 T: Ord+Lattice+timely::progress::Timestamp+Clone,
164 R: Semigroup+Clone,
165{
166 type Key = K::Owned;
167 type Val = V::Owned;
168 type Time = T;
169 type Diff = R;
170}
171
172impl<K, V, T, D> Layout for Preferred<K, V, T, D>
173where
174 K: Ord+ToOwned+PreferredContainer + ?Sized,
175 K::Owned: Ord+Clone+'static,
176 V: Ord+ToOwned+PreferredContainer + ?Sized + 'static,
178 V::Owned: Ord+Clone,
179 T: Ord+Lattice+timely::progress::Timestamp+Clone,
180 D: Semigroup+Clone,
181{
182 type Target = Preferred<K, V, T, D>;
183 type KeyContainer = K::Container;
184 type ValContainer = V::Container;
185 type UpdContainer = Vec<(T, D)>;
186}
187
188use std::convert::TryInto;
189use abomonation_derive::Abomonation;
190
191#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)]
193pub struct OffsetList {
194 pub smol: Vec<u32>,
196 pub chonk: Vec<u64>,
198}
199
200impl OffsetList {
201 pub fn push(&mut self, offset: usize) {
203 if self.chonk.is_empty() {
204 if let Ok(smol) = offset.try_into() {
205 self.smol.push(smol);
206 }
207 else {
208 self.chonk.push(offset.try_into().unwrap())
209 }
210 }
211 else {
212 self.chonk.push(offset.try_into().unwrap())
213 }
214 }
215 pub fn index(&self, index: usize) -> usize {
217 if index < self.smol.len() {
218 self.smol[index].try_into().unwrap()
219 }
220 else {
221 self.chonk[index - self.smol.len()].try_into().unwrap()
222 }
223 }
224 pub fn set(&mut self, index: usize, offset: usize) {
228 if index < self.smol.len() {
229 if let Ok(off) = offset.try_into() {
230 self.smol[index] = off;
231 }
232 else {
233 self.chonk.splice(0..0, self.smol.drain(index ..).map(|x| x.try_into().unwrap()));
235 self.chonk[index - self.smol.len()] = offset.try_into().unwrap();
236 }
237 }
238 else {
239 self.chonk[index - self.smol.len()] = offset.try_into().unwrap();
240 }
241 }
242 pub fn last(&self) -> Option<usize> {
244 if self.chonk.is_empty() {
245 self.smol.last().map(|x| (*x).try_into().unwrap())
246 }
247 else {
248 self.chonk.last().map(|x| (*x).try_into().unwrap())
249 }
250 }
251 pub fn len(&self) -> usize {
253 self.smol.len() + self.chonk.len()
254 }
255 pub fn with_capacity(cap: usize) -> Self {
257 Self {
258 smol: Vec::with_capacity(cap),
259 chonk: Vec::new(),
260 }
261 }
262 pub fn truncate(&mut self, length: usize) {
264 if length > self.smol.len() {
265 self.chonk.truncate(length - self.smol.len());
266 }
267 else {
268 assert!(self.chonk.is_empty());
269 self.smol.truncate(length);
270 }
271 }
272}
273
274pub use self::containers::{BatchContainer, SliceContainer, SliceContainer2};
275
276pub mod containers {
278
279 use timely::container::columnation::{Columnation, TimelyStack};
280
281 use std::borrow::{Borrow, ToOwned};
282 use crate::trace::MyTrait;
283
284 pub trait BatchContainer: Default + 'static {
286 type PushItem;
290 type ReadItem<'a>: Copy + MyTrait<'a, Owned = Self::PushItem> + for<'b> PartialOrd<Self::ReadItem<'b>>;
292 fn push(&mut self, item: Self::PushItem);
294 fn copy_push(&mut self, item: &Self::PushItem);
296 fn copy(&mut self, item: Self::ReadItem<'_>);
298 fn copy_slice(&mut self, slice: &[Self::PushItem]);
300 fn copy_range(&mut self, other: &Self, start: usize, end: usize);
302 fn with_capacity(size: usize) -> Self;
304 fn reserve(&mut self, additional: usize);
306 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
308
309 fn index(&self, index: usize) -> Self::ReadItem<'_>;
311 fn len(&self) -> usize;
313 fn last(&self) -> Option<Self::ReadItem<'_>> {
315 if self.len() > 0 {
316 Some(self.index(self.len()-1))
317 }
318 else {
319 None
320 }
321 }
322
323 fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
330
331 let small_limit = 8;
332
333 if end > start + small_limit && function(self.index(start + small_limit)) {
335
336 let mut index = small_limit + 1;
338 if start + index < end && function(self.index(start + index)) {
339
340 let mut step = 1;
342 while start + index + step < end && function(self.index(start + index + step)) {
343 index += step;
344 step <<= 1;
345 }
346
347 step >>= 1;
349 while step > 0 {
350 if start + index + step < end && function(self.index(start + index + step)) {
351 index += step;
352 }
353 step >>= 1;
354 }
355
356 index += 1;
357 }
358
359 index
360 }
361 else {
362 let limit = std::cmp::min(end, start + small_limit);
363 (start .. limit).filter(|x| function(self.index(*x))).count()
364 }
365 }
366 }
367
368 impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
371 type PushItem = T;
372 type ReadItem<'a> = &'a Self::PushItem;
373
374 fn push(&mut self, item: T) {
375 self.push(item);
376 }
377 fn copy_push(&mut self, item: &T) {
378 self.copy(item);
379 }
380 fn copy(&mut self, item: &T) {
381 self.push(item.clone());
382 }
383 fn copy_slice(&mut self, slice: &[T]) {
384 self.extend_from_slice(slice);
385 }
386 fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
387 self.extend_from_slice(&other[start .. end]);
388 }
389 fn with_capacity(size: usize) -> Self {
390 Vec::with_capacity(size)
391 }
392 fn reserve(&mut self, additional: usize) {
393 self.reserve(additional);
394 }
395 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
396 Vec::with_capacity(cont1.len() + cont2.len())
397 }
398 fn index(&self, index: usize) -> Self::ReadItem<'_> {
399 &self[index]
400 }
401 fn len(&self) -> usize {
402 self[..].len()
403 }
404 }
405
406 impl<T: Ord + Columnation + ToOwned<Owned = T> + 'static> BatchContainer for TimelyStack<T> {
409 type PushItem = T;
410 type ReadItem<'a> = &'a Self::PushItem;
411
412 fn push(&mut self, item: Self::PushItem) {
413 self.copy(item.borrow());
414 }
415 fn copy_push(&mut self, item: &Self::PushItem) {
416 self.copy(item);
417 }
418 fn copy(&mut self, item: &T) {
419 self.copy(item);
420 }
421 fn copy_slice(&mut self, slice: &[Self::PushItem]) {
422 self.reserve_items(slice.iter());
423 for item in slice.iter() {
424 self.copy(item);
425 }
426 }
427 fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
428 let slice = &other[start .. end];
429 self.reserve_items(slice.iter());
430 for item in slice.iter() {
431 self.copy(item);
432 }
433 }
434 fn with_capacity(size: usize) -> Self {
435 Self::with_capacity(size)
436 }
437 fn reserve(&mut self, _additional: usize) {
438 }
439 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
440 let mut new = Self::default();
441 new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
442 new
443 }
444 fn index(&self, index: usize) -> Self::ReadItem<'_> {
445 &self[index]
446 }
447 fn len(&self) -> usize {
448 self[..].len()
449 }
450 }
451
452 pub struct SliceContainer<B> {
454 offsets: Vec<usize>,
459 inner: Vec<B>,
461 }
462
463 impl<B> BatchContainer for SliceContainer<B>
464 where
465 B: Ord + Clone + Sized + 'static,
466 {
467 type PushItem = Vec<B>;
468 type ReadItem<'a> = &'a [B];
469 fn push(&mut self, item: Vec<B>) {
470 for x in item.into_iter() {
471 self.inner.push(x);
472 }
473 self.offsets.push(self.inner.len());
474 }
475 fn copy_push(&mut self, item: &Vec<B>) {
476 self.copy(&item[..]);
477 }
478 fn copy(&mut self, item: Self::ReadItem<'_>) {
479 for x in item.iter() {
480 self.inner.copy(x);
481 }
482 self.offsets.push(self.inner.len());
483 }
484 fn copy_slice(&mut self, slice: &[Vec<B>]) {
485 for item in slice {
486 self.copy(item);
487 }
488 }
489 fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
490 for index in start .. end {
491 self.copy(other.index(index));
492 }
493 }
494 fn with_capacity(size: usize) -> Self {
495 let mut offsets = Vec::with_capacity(size + 1);
496 offsets.push(0);
497 Self {
498 offsets,
499 inner: Vec::with_capacity(size),
500 }
501 }
502 fn reserve(&mut self, _additional: usize) {
503 }
504 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
505 let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
506 offsets.push(0);
507 Self {
508 offsets,
509 inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
510 }
511 }
512 fn index(&self, index: usize) -> Self::ReadItem<'_> {
513 let lower = self.offsets[index];
514 let upper = self.offsets[index+1];
515 &self.inner[lower .. upper]
516 }
517 fn len(&self) -> usize {
518 self.offsets.len() - 1
519 }
520 }
521
522 impl<B> Default for SliceContainer<B> {
524 fn default() -> Self {
525 Self {
526 offsets: vec![0],
527 inner: Default::default(),
528 }
529 }
530 }
531
532 pub struct SliceContainer2<B> {
534 text: String,
535 offsets: Vec<usize>,
540 inner: Vec<B>,
542 }
543
544 pub struct Greetings<'a, B> {
546 pub text: Option<&'a str>,
548 pub slice: &'a [B],
550 }
551
552 impl<'a, B> Copy for Greetings<'a, B> { }
553 impl<'a, B> Clone for Greetings<'a, B> {
554 fn clone(&self) -> Self { *self }
555 }
556
557 use std::cmp::Ordering;
558 impl<'a, 'b, B: Ord> PartialEq<Greetings<'a, B>> for Greetings<'b, B> {
559 fn eq(&self, other: &Greetings<'a, B>) -> bool {
560 self.slice.eq(other.slice)
561 }
562 }
563 impl<'a, B: Ord> Eq for Greetings<'a, B> { }
564 impl<'a, 'b, B: Ord> PartialOrd<Greetings<'a, B>> for Greetings<'b, B> {
565 fn partial_cmp(&self, other: &Greetings<'a, B>) -> Option<Ordering> {
566 self.slice.partial_cmp(other.slice)
567 }
568 }
569 impl<'a, B: Ord> Ord for Greetings<'a, B> {
570 fn cmp(&self, other: &Self) -> Ordering {
571 self.partial_cmp(other).unwrap()
572 }
573 }
574
575 impl<'a, B: Ord + Clone> MyTrait<'a> for Greetings<'a, B> {
576 type Owned = Vec<B>;
577 fn into_owned(self) -> Self::Owned { self.slice.to_vec() }
578 fn clone_onto(&self, other: &mut Self::Owned) {
579 self.slice.clone_into(other);
580 }
581 fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering {
582 self.slice.cmp(&other[..])
583 }
584 fn borrow_as(other: &'a Self::Owned) -> Self {
585 Self {
586 text: None,
587 slice: &other[..],
588 }
589 }
590 }
591
592
593
594 impl<B> BatchContainer for SliceContainer2<B>
595 where
596 B: Ord + Clone + Sized + 'static,
597 {
598 type PushItem = Vec<B>;
599 type ReadItem<'a> = Greetings<'a, B>;
600 fn push(&mut self, item: Vec<B>) {
601 for x in item.into_iter() {
602 self.inner.push(x);
603 }
604 self.offsets.push(self.inner.len());
605 }
606 fn copy_push(&mut self, item: &Vec<B>) {
607 self.copy(<_ as MyTrait>::borrow_as(item));
608 }
609 fn copy(&mut self, item: Self::ReadItem<'_>) {
610 for x in item.slice.iter() {
611 self.inner.copy(x);
612 }
613 self.offsets.push(self.inner.len());
614 }
615 fn copy_slice(&mut self, slice: &[Vec<B>]) {
616 for item in slice {
617 self.copy_push(item);
618 }
619 }
620 fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
621 for index in start .. end {
622 self.copy(other.index(index));
623 }
624 }
625 fn with_capacity(size: usize) -> Self {
626 let mut offsets = Vec::with_capacity(size + 1);
627 offsets.push(0);
628 Self {
629 text: format!("Hello!"),
630 offsets,
631 inner: Vec::with_capacity(size),
632 }
633 }
634 fn reserve(&mut self, _additional: usize) {
635 }
636 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
637 let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
638 offsets.push(0);
639 Self {
640 text: format!("Hello!"),
641 offsets,
642 inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
643 }
644 }
645 fn index(&self, index: usize) -> Self::ReadItem<'_> {
646 let lower = self.offsets[index];
647 let upper = self.offsets[index+1];
648 Greetings {
649 text: Some(&self.text),
650 slice: &self.inner[lower .. upper],
651 }
652 }
653 fn len(&self) -> usize {
654 self.offsets.len() - 1
655 }
656 }
657
658 impl<B> Default for SliceContainer2<B> {
660 fn default() -> Self {
661 Self {
662 text: format!("Hello!"),
663 offsets: vec![0],
664 inner: Default::default(),
665 }
666 }
667 }
668}