1use rayon;
4
5pub trait ChunkBufferBuilder<T: Send>: Default {
7 type Buffer: ChunkBuffer<T>;
9
10 fn build(&self) -> Self::Buffer;
12}
13
14pub trait ChunkBuffer<T: Send>: IntoIterator<Item = T> + rayon::slice::ParallelSliceMut<T> + Send {
16 fn push(&mut self, item: T);
21
22 fn len(&self) -> usize;
24
25 fn is_full(&self) -> bool;
27}
28
29pub struct LimitedBufferBuilder {
31 buffer_limit: usize,
32 preallocate: bool,
33}
34
35impl LimitedBufferBuilder {
36 pub fn new(buffer_limit: usize, preallocate: bool) -> Self {
42 LimitedBufferBuilder {
43 buffer_limit,
44 preallocate,
45 }
46 }
47}
48
49impl<T: Send> ChunkBufferBuilder<T> for LimitedBufferBuilder {
50 type Buffer = LimitedBuffer<T>;
51
52 fn build(&self) -> Self::Buffer {
53 if self.preallocate {
54 LimitedBuffer::new(self.buffer_limit)
55 } else {
56 LimitedBuffer::with_capacity(self.buffer_limit)
57 }
58 }
59}
60
61impl Default for LimitedBufferBuilder {
62 fn default() -> Self {
63 LimitedBufferBuilder {
64 buffer_limit: usize::MAX,
65 preallocate: false,
66 }
67 }
68}
69
70pub struct LimitedBuffer<T> {
72 limit: usize,
73 inner: Vec<T>,
74}
75
76impl<T> LimitedBuffer<T> {
77 pub fn new(limit: usize) -> Self {
82 LimitedBuffer {
83 limit,
84 inner: Vec::new(),
85 }
86 }
87
88 pub fn with_capacity(limit: usize) -> Self {
93 LimitedBuffer {
94 limit,
95 inner: Vec::with_capacity(limit),
96 }
97 }
98}
99
100impl<T: Send> ChunkBuffer<T> for LimitedBuffer<T> {
101 fn push(&mut self, item: T) {
102 self.inner.push(item);
103 }
104
105 fn len(&self) -> usize {
106 self.inner.len()
107 }
108
109 fn is_full(&self) -> bool {
110 self.inner.len() >= self.limit
111 }
112}
113
114impl<T> IntoIterator for LimitedBuffer<T> {
115 type Item = T;
116 type IntoIter = <Vec<T> as IntoIterator>::IntoIter;
117
118 fn into_iter(self) -> Self::IntoIter {
119 self.inner.into_iter()
120 }
121}
122
123impl<T: Send> rayon::slice::ParallelSliceMut<T> for LimitedBuffer<T> {
124 fn as_parallel_slice_mut(&mut self) -> &mut [T] {
125 self.inner.as_mut_slice()
126 }
127}
128
129#[cfg(test)]
130mod test {
131 use super::{ChunkBuffer, ChunkBufferBuilder, LimitedBufferBuilder};
132
133 #[test]
134 fn test_limited_buffer() {
135 let builder = LimitedBufferBuilder::new(2, true);
136 let mut buffer = builder.build();
137
138 buffer.push(0);
139 assert_eq!(buffer.is_full(), false);
140 buffer.push(1);
141 assert_eq!(buffer.is_full(), true);
142
143 let data = Vec::from_iter(buffer);
144 assert_eq!(data, vec![0, 1]);
145 }
146}
147
148#[cfg(feature = "memory-limit")]
149pub mod mem {
150 use deepsize;
151 use rayon;
152
153 use super::{ChunkBuffer, ChunkBufferBuilder};
154
155 pub struct MemoryLimitedBufferBuilder {
157 buffer_limit: u64,
158 }
159
160 impl MemoryLimitedBufferBuilder {
161 pub fn new(buffer_limit: u64) -> Self {
166 MemoryLimitedBufferBuilder { buffer_limit }
167 }
168 }
169
170 impl<T: Send> ChunkBufferBuilder<T> for MemoryLimitedBufferBuilder
171 where
172 T: deepsize::DeepSizeOf,
173 {
174 type Buffer = MemoryLimitedBuffer<T>;
175
176 fn build(&self) -> Self::Buffer {
177 MemoryLimitedBuffer::new(self.buffer_limit)
178 }
179 }
180
181 impl Default for MemoryLimitedBufferBuilder {
182 fn default() -> Self {
183 MemoryLimitedBufferBuilder { buffer_limit: u64::MAX }
184 }
185 }
186
187 pub struct MemoryLimitedBuffer<T> {
189 limit: u64,
190 current_size: u64,
191 inner: Vec<T>,
192 }
193
194 impl<T> MemoryLimitedBuffer<T> {
195 pub fn new(limit: u64) -> Self {
200 MemoryLimitedBuffer {
201 limit,
202 current_size: 0,
203 inner: Vec::new(),
204 }
205 }
206
207 pub fn mem_size(&self) -> u64 {
209 self.current_size
210 }
211 }
212
213 impl<T: Send> ChunkBuffer<T> for MemoryLimitedBuffer<T>
214 where
215 T: deepsize::DeepSizeOf,
216 {
217 fn push(&mut self, item: T) {
218 self.current_size += item.deep_size_of() as u64;
219 self.inner.push(item);
220 }
221
222 fn len(&self) -> usize {
223 self.inner.len()
224 }
225
226 fn is_full(&self) -> bool {
227 self.current_size >= self.limit
228 }
229 }
230
231 impl<T> IntoIterator for MemoryLimitedBuffer<T> {
232 type Item = T;
233 type IntoIter = <Vec<T> as IntoIterator>::IntoIter;
234
235 fn into_iter(self) -> Self::IntoIter {
236 self.inner.into_iter()
237 }
238 }
239
240 impl<T: Send> rayon::slice::ParallelSliceMut<T> for MemoryLimitedBuffer<T> {
241 fn as_parallel_slice_mut(&mut self) -> &mut [T] {
242 self.inner.as_mut_slice()
243 }
244 }
245
246 #[cfg(test)]
247 mod test {
248 use deepsize;
249
250 use super::{ChunkBuffer, ChunkBufferBuilder, MemoryLimitedBufferBuilder};
251
252 #[derive(Debug, Clone, PartialEq, Eq, deepsize::DeepSizeOf)]
253 struct MyType {
254 number: i64,
255 string: String,
256 }
257
258 #[test]
259 fn test_memory_limited_buffer() {
260 let builder = MemoryLimitedBufferBuilder::new(76);
261 let mut buffer = builder.build();
262
263 let item1 = MyType {
264 number: 0, string: "hello!".into(), };
267 buffer.push(item1.clone());
268 assert_eq!(buffer.mem_size(), 38);
269 assert_eq!(buffer.is_full(), false);
270
271 let item2 = MyType {
272 number: 1, string: "world!".into(), };
275 buffer.push(item2.clone());
276 assert_eq!(buffer.mem_size(), 76);
277 assert_eq!(buffer.is_full(), true);
278
279 let actual_data = Vec::from_iter(buffer);
280 let expected_data = vec![item1, item2];
281 assert_eq!(actual_data, expected_data);
282 }
283 }
284}