Skip to main content

differential_dataflow/columnar/
builder.rs

1//! `ValColBuilder`: the ContainerBuilder that feeds the dataflow input side.
2//!
3//! Accepts flat `(k, v, t, d)` tuples via `PushInto`; when the internal tuple
4//! container reaches a threshold, sorts + forms a `RecordedUpdates` trie and
5//! queues it. `finish` produces one final trie from any remaining tuples.
6
7use std::collections::VecDeque;
8use columnar::{Columnar, Clear, Len, Push};
9
10use super::layout::ColumnarUpdate as Update;
11use super::updates::UpdatesTyped;
12use super::RecordedUpdates;
13
14type TupleContainer<U> = <(<U as Update>::Key, <U as Update>::Val, <U as Update>::Time, <U as Update>::Diff) as Columnar>::Container;
15
16/// A container builder that produces `RecordedUpdates` (sorted, consolidated trie + record count).
17pub struct ValBuilder<U: Update> {
18    /// Container that we're writing to.
19    current: TupleContainer<U>,
20    /// Empty allocation.
21    empty: Option<RecordedUpdates<U>>,
22    /// Completed containers pending to be sent.
23    pending: VecDeque<RecordedUpdates<U>>,
24}
25
26use timely::container::PushInto;
27impl<T, U: Update> PushInto<T> for ValBuilder<U> where TupleContainer<U> : Push<T> {
28    #[inline]
29    fn push_into(&mut self, item: T) {
30        self.current.push(item);
31        if self.current.len() > crate::columnar::LINK_TARGET {
32            use columnar::{Borrow, Index};
33            let records = self.current.len();
34            let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
35            refs.sort();
36            let updates = UpdatesTyped::form(refs.into_iter()).into();
37            self.pending.push_back(RecordedUpdates { updates, records, consolidated: true });
38            self.current.clear();
39        }
40    }
41}
42
43impl<U: Update> Default for ValBuilder<U> {
44    fn default() -> Self {
45        ValBuilder {
46            current: Default::default(),
47            empty: None,
48            pending: Default::default(),
49        }
50    }
51}
52
53use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
54impl<U: Update> ContainerBuilder for ValBuilder<U> {
55    type Container = RecordedUpdates<U>;
56
57    #[inline]
58    fn extract(&mut self) -> Option<&mut Self::Container> {
59        if let Some(container) = self.pending.pop_front() {
60            self.empty = Some(container);
61            self.empty.as_mut()
62        } else {
63            None
64        }
65    }
66
67    #[inline]
68    fn finish(&mut self) -> Option<&mut Self::Container> {
69        if !self.current.is_empty() {
70            use columnar::{Borrow, Index};
71            let records = self.current.len();
72            let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
73            refs.sort();
74            let updates = UpdatesTyped::form(refs.into_iter()).into();
75            self.pending.push_back(RecordedUpdates { updates, records, consolidated: true });
76            self.current.clear();
77        }
78        self.empty = self.pending.pop_front();
79        self.empty.as_mut()
80    }
81}
82
83impl<U: Update> LengthPreservingContainerBuilder for ValBuilder<U> { }