differential_dataflow/columnar/
builder.rs1use std::collections::VecDeque;
8use columnar::{Columnar, Clear, Len, Push};
9
10use super::layout::ColumnarUpdate as Update;
11use super::updates::UpdatesTyped;
12use super::RecordedUpdates;
13
14type TupleContainer<U> = <(<U as Update>::Key, <U as Update>::Val, <U as Update>::Time, <U as Update>::Diff) as Columnar>::Container;
15
16pub struct ValBuilder<U: Update> {
18 current: TupleContainer<U>,
20 empty: Option<RecordedUpdates<U>>,
22 pending: VecDeque<RecordedUpdates<U>>,
24}
25
26use timely::container::PushInto;
27impl<T, U: Update> PushInto<T> for ValBuilder<U> where TupleContainer<U> : Push<T> {
28 #[inline]
29 fn push_into(&mut self, item: T) {
30 self.current.push(item);
31 if self.current.len() > crate::columnar::LINK_TARGET {
32 use columnar::{Borrow, Index};
33 let records = self.current.len();
34 let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
35 refs.sort();
36 let updates = UpdatesTyped::form(refs.into_iter()).into();
37 self.pending.push_back(RecordedUpdates { updates, records, consolidated: true });
38 self.current.clear();
39 }
40 }
41}
42
43impl<U: Update> Default for ValBuilder<U> {
44 fn default() -> Self {
45 ValBuilder {
46 current: Default::default(),
47 empty: None,
48 pending: Default::default(),
49 }
50 }
51}
52
53use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
54impl<U: Update> ContainerBuilder for ValBuilder<U> {
55 type Container = RecordedUpdates<U>;
56
57 #[inline]
58 fn extract(&mut self) -> Option<&mut Self::Container> {
59 if let Some(container) = self.pending.pop_front() {
60 self.empty = Some(container);
61 self.empty.as_mut()
62 } else {
63 None
64 }
65 }
66
67 #[inline]
68 fn finish(&mut self) -> Option<&mut Self::Container> {
69 if !self.current.is_empty() {
70 use columnar::{Borrow, Index};
71 let records = self.current.len();
72 let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
73 refs.sort();
74 let updates = UpdatesTyped::form(refs.into_iter()).into();
75 self.pending.push_back(RecordedUpdates { updates, records, consolidated: true });
76 self.current.clear();
77 }
78 self.empty = self.pending.pop_front();
79 self.empty.as_mut()
80 }
81}
82
83impl<U: Update> LengthPreservingContainerBuilder for ValBuilder<U> { }