differential_dataflow/trace/implementations/
chunker.rs1use std::collections::VecDeque;
4
5use columnation::Columnation;
6use timely::Container;
7use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
8
9use crate::containers::TimelyStack;
10use crate::consolidation::{consolidate_updates, Consolidate};
11use crate::difference::Semigroup;
12
13pub struct ColumnationChunker<T: Columnation> {
20 pending: Vec<T>,
21 ready: VecDeque<TimelyStack<T>>,
22 empty: Option<TimelyStack<T>>,
23}
24
25impl<T: Columnation> Default for ColumnationChunker<T> {
26 fn default() -> Self {
27 Self {
28 pending: Vec::default(),
29 ready: VecDeque::default(),
30 empty: None,
31 }
32 }
33}
34
35impl<D,T,R> ColumnationChunker<(D, T, R)>
36where
37 D: Columnation + Ord,
38 T: Columnation + Ord,
39 R: Columnation + Semigroup,
40{
41 const BUFFER_SIZE_BYTES: usize = 64 << 10;
42 fn chunk_capacity() -> usize {
43 let size = ::std::mem::size_of::<(D, T, R)>();
44 if size == 0 {
45 Self::BUFFER_SIZE_BYTES
46 } else if size <= Self::BUFFER_SIZE_BYTES {
47 Self::BUFFER_SIZE_BYTES / size
48 } else {
49 1
50 }
51 }
52
53 fn form_chunk(&mut self) {
54 consolidate_updates(&mut self.pending);
55 if self.pending.len() >= Self::chunk_capacity() {
56 while self.pending.len() > Self::chunk_capacity() {
57 let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
58 for item in self.pending.drain(..chunk.capacity()) {
59 chunk.copy(&item);
60 }
61 self.ready.push_back(chunk);
62 }
63 }
64 }
65}
66
67impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
68where
69 D: Columnation + Ord + Clone,
70 T: Columnation + Ord + Clone,
71 R: Columnation + Semigroup + Clone,
72{
73 fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
74 if self.pending.capacity() < Self::chunk_capacity() * 2 {
75 self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
76 }
77
78 let mut drain = container.drain(..).peekable();
79 while drain.peek().is_some() {
80 self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
81 if self.pending.len() == self.pending.capacity() {
82 self.form_chunk();
83 }
84 }
85 }
86}
87
88impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
89where
90 D: Columnation + Ord + Clone + 'static,
91 T: Columnation + Ord + Clone + 'static,
92 R: Columnation + Semigroup + Clone + 'static,
93{
94 type Container = TimelyStack<(D,T,R)>;
95
96 fn extract(&mut self) -> Option<&mut Self::Container> {
97 if let Some(ready) = self.ready.pop_front() {
98 self.empty = Some(ready);
99 self.empty.as_mut()
100 } else {
101 None
102 }
103 }
104
105 fn finish(&mut self) -> Option<&mut Self::Container> {
106 consolidate_updates(&mut self.pending);
107 while !self.pending.is_empty() {
108 let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
109 for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) {
110 chunk.copy(&item);
111 }
112 self.ready.push_back(chunk);
113 }
114 self.empty = self.ready.pop_front();
115 self.empty.as_mut()
116 }
117}
118
119pub struct ContainerChunker<Output> {
121 pending: Output,
122 ready: VecDeque<Output>,
123 empty: Output,
124}
125
126impl<Output: Default> Default for ContainerChunker<Output> {
127 fn default() -> Self {
128 Self {
129 pending: Output::default(),
130 ready: VecDeque::default(),
131 empty: Output::default(),
132 }
133 }
134}
135
136impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
137where
138 Input: DrainContainer,
139 Output: Default
140 + SizableContainer
141 + Consolidate
142 + PushInto<Input::Item<'a>>,
143{
144 fn push_into(&mut self, container: &'a mut Input) {
145 self.pending.ensure_capacity(&mut None);
146
147 for item in container.drain() {
148 self.pending.push_into(item);
149 if self.pending.at_capacity() {
150 let starting_len = self.pending.len();
151 self.pending.consolidate_into(&mut self.empty);
152 std::mem::swap(&mut self.pending, &mut self.empty);
153 self.empty.clear();
154 if self.pending.len() > starting_len / 2 {
155 self.ready.push_back(std::mem::take(&mut self.pending));
159 }
160 }
161 }
162 }
163}
164
165impl<Output> ContainerBuilder for ContainerChunker<Output>
166where
167 Output: SizableContainer + Consolidate + Container,
168{
169 type Container = Output;
170
171 fn extract(&mut self) -> Option<&mut Self::Container> {
172 if let Some(ready) = self.ready.pop_front() {
173 self.empty = ready;
174 Some(&mut self.empty)
175 } else {
176 None
177 }
178 }
179
180 fn finish(&mut self) -> Option<&mut Self::Container> {
181 if !self.pending.is_empty() {
182 self.pending.consolidate_into(&mut self.empty);
183 std::mem::swap(&mut self.pending, &mut self.empty);
184 self.empty.clear();
185 if !self.pending.is_empty() {
186 self.ready.push_back(std::mem::take(&mut self.pending));
187 }
188 }
189 if let Some(ready) = self.ready.pop_front() {
190 self.empty = ready;
191 Some(&mut self.empty)
192 } else {
193 None
194 }
195 }
196}