Skip to main content

differential_dataflow/trace/implementations/
chunker.rs

1//! Organize streams of data into sorted chunks.
2
3use std::collections::VecDeque;
4
5use columnation::Columnation;
6use timely::Container;
7use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
8
9use crate::containers::TimelyStack;
10use crate::consolidation::{consolidate_updates, Consolidate};
11use crate::difference::Semigroup;
12
13/// Chunk a stream of vectors into chains of columnation stacks.
14///
15/// This chunker accumulates into a `Vec` (not a `TimelyStack`) for efficient
16/// in-place sorting and consolidation, then copies the consolidated results
17/// into `TimelyStack` chunks. This avoids the cost of sorting through
18/// columnation indirection.
19pub struct ColumnationChunker<T: Columnation> {
20    pending: Vec<T>,
21    ready: VecDeque<TimelyStack<T>>,
22    empty: Option<TimelyStack<T>>,
23}
24
25impl<T: Columnation> Default for ColumnationChunker<T> {
26    fn default() -> Self {
27        Self {
28            pending: Vec::default(),
29            ready: VecDeque::default(),
30            empty: None,
31        }
32    }
33}
34
35impl<D,T,R> ColumnationChunker<(D, T, R)>
36where
37    D: Columnation + Ord,
38    T: Columnation + Ord,
39    R: Columnation + Semigroup,
40{
41    const BUFFER_SIZE_BYTES: usize = 64 << 10;
42    fn chunk_capacity() -> usize {
43        let size = ::std::mem::size_of::<(D, T, R)>();
44        if size == 0 {
45            Self::BUFFER_SIZE_BYTES
46        } else if size <= Self::BUFFER_SIZE_BYTES {
47            Self::BUFFER_SIZE_BYTES / size
48        } else {
49            1
50        }
51    }
52
53    fn form_chunk(&mut self) {
54        consolidate_updates(&mut self.pending);
55        if self.pending.len() >= Self::chunk_capacity() {
56            while self.pending.len() > Self::chunk_capacity() {
57                let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
58                for item in self.pending.drain(..chunk.capacity()) {
59                    chunk.copy(&item);
60                }
61                self.ready.push_back(chunk);
62            }
63        }
64    }
65}
66
67impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
68where
69    D: Columnation + Ord + Clone,
70    T: Columnation + Ord + Clone,
71    R: Columnation + Semigroup + Clone,
72{
73    fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
74        if self.pending.capacity() < Self::chunk_capacity() * 2 {
75            self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
76        }
77
78        let mut drain = container.drain(..).peekable();
79        while drain.peek().is_some() {
80            self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
81            if self.pending.len() == self.pending.capacity() {
82                self.form_chunk();
83            }
84        }
85    }
86}
87
88impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
89where
90    D: Columnation + Ord + Clone + 'static,
91    T: Columnation + Ord + Clone + 'static,
92    R: Columnation + Semigroup + Clone + 'static,
93{
94    type Container = TimelyStack<(D,T,R)>;
95
96    fn extract(&mut self) -> Option<&mut Self::Container> {
97        if let Some(ready) = self.ready.pop_front() {
98            self.empty = Some(ready);
99            self.empty.as_mut()
100        } else {
101            None
102        }
103    }
104
105    fn finish(&mut self) -> Option<&mut Self::Container> {
106        consolidate_updates(&mut self.pending);
107        while !self.pending.is_empty() {
108            let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
109            for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) {
110                chunk.copy(&item);
111            }
112            self.ready.push_back(chunk);
113        }
114        self.empty = self.ready.pop_front();
115        self.empty.as_mut()
116    }
117}
118
119/// Chunk a stream of containers into chains of vectors.
120pub struct ContainerChunker<Output> {
121    pending: Output,
122    ready: VecDeque<Output>,
123    empty: Output,
124}
125
126impl<Output: Default> Default for ContainerChunker<Output> {
127    fn default() -> Self {
128        Self {
129            pending: Output::default(),
130            ready: VecDeque::default(),
131            empty: Output::default(),
132        }
133    }
134}
135
136impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
137where
138    Input: DrainContainer,
139    Output: Default
140        + SizableContainer
141        + Consolidate
142        + PushInto<Input::Item<'a>>,
143{
144    fn push_into(&mut self, container: &'a mut Input) {
145        self.pending.ensure_capacity(&mut None);
146
147        for item in container.drain() {
148            self.pending.push_into(item);
149            if self.pending.at_capacity() {
150                let starting_len = self.pending.len();
151                self.pending.consolidate_into(&mut self.empty);
152                std::mem::swap(&mut self.pending, &mut self.empty);
153                self.empty.clear();
154                if self.pending.len() > starting_len / 2 {
155                    // Note that we're pushing non-full containers, which is a deviation from
156                    // other implementation. The reason for this is that we cannot extract
157                    // partial data from `this.pending`. We should revisit this in the future.
158                    self.ready.push_back(std::mem::take(&mut self.pending));
159                }
160            }
161        }
162    }
163}
164
165impl<Output> ContainerBuilder for ContainerChunker<Output>
166where
167    Output: SizableContainer + Consolidate + Container,
168{
169    type Container = Output;
170
171    fn extract(&mut self) -> Option<&mut Self::Container> {
172        if let Some(ready) = self.ready.pop_front() {
173            self.empty = ready;
174            Some(&mut self.empty)
175        } else {
176            None
177        }
178    }
179
180    fn finish(&mut self) -> Option<&mut Self::Container> {
181        if !self.pending.is_empty() {
182            self.pending.consolidate_into(&mut self.empty);
183            std::mem::swap(&mut self.pending, &mut self.empty);
184            self.empty.clear();
185            if !self.pending.is_empty() {
186                self.ready.push_back(std::mem::take(&mut self.pending));
187            }
188        }
189        if let Some(ready) = self.ready.pop_front() {
190            self.empty = ready;
191            Some(&mut self.empty)
192        } else {
193            None
194        }
195    }
196}