ext_sort/
buffer.rs

1//! Limited chunk buffer.
2
3use rayon;
4
5/// Limited buffer builder. Creates buffers using provided buffer parameters.
6pub trait ChunkBufferBuilder<T: Send>: Default {
7    /// Building buffer type
8    type Buffer: ChunkBuffer<T>;
9
10    /// Creates a new [`ChunkBuffer`] trait instance.
11    fn build(&self) -> Self::Buffer;
12}
13
14/// Base limited buffer interface. Provides methods for pushing data to the buffer and checking buffer state.
15pub trait ChunkBuffer<T: Send>: IntoIterator<Item = T> + rayon::slice::ParallelSliceMut<T> + Send {
16    /// Adds a new element to the buffer.
17    ///
18    /// # Arguments
19    /// * `item` - Item to be added to the buffer
20    fn push(&mut self, item: T);
21
22    /// Returns the buffer length.
23    fn len(&self) -> usize;
24
25    /// Checks if the buffer reached the limit.
26    fn is_full(&self) -> bool;
27}
28
29/// [`LimitedBuffer`] builder.
30pub struct LimitedBufferBuilder {
31    buffer_limit: usize,
32    preallocate: bool,
33}
34
35impl LimitedBufferBuilder {
36    /// Creates a new instance of a builder.
37    ///
38    /// # Arguments
39    /// * `buffer_limit` - Buffer size limit in element count
40    /// * `preallocate` - If buffer should be preallocated
41    pub fn new(buffer_limit: usize, preallocate: bool) -> Self {
42        LimitedBufferBuilder {
43            buffer_limit,
44            preallocate,
45        }
46    }
47}
48
49impl<T: Send> ChunkBufferBuilder<T> for LimitedBufferBuilder {
50    type Buffer = LimitedBuffer<T>;
51
52    fn build(&self) -> Self::Buffer {
53        if self.preallocate {
54            LimitedBuffer::new(self.buffer_limit)
55        } else {
56            LimitedBuffer::with_capacity(self.buffer_limit)
57        }
58    }
59}
60
61impl Default for LimitedBufferBuilder {
62    fn default() -> Self {
63        LimitedBufferBuilder {
64            buffer_limit: usize::MAX,
65            preallocate: false,
66        }
67    }
68}
69
70/// Buffer limited by elements count.
71pub struct LimitedBuffer<T> {
72    limit: usize,
73    inner: Vec<T>,
74}
75
76impl<T> LimitedBuffer<T> {
77    /// Creates a new buffer instance.
78    ///
79    /// # Arguments
80    /// * `limit` - Buffer elements count limit
81    pub fn new(limit: usize) -> Self {
82        LimitedBuffer {
83            limit,
84            inner: Vec::new(),
85        }
86    }
87
88    /// Creates a new buffer instance with provided capacity.
89    ///
90    /// # Arguments
91    /// * `limit` - Buffer elements count limit
92    pub fn with_capacity(limit: usize) -> Self {
93        LimitedBuffer {
94            limit,
95            inner: Vec::with_capacity(limit),
96        }
97    }
98}
99
100impl<T: Send> ChunkBuffer<T> for LimitedBuffer<T> {
101    fn push(&mut self, item: T) {
102        self.inner.push(item);
103    }
104
105    fn len(&self) -> usize {
106        self.inner.len()
107    }
108
109    fn is_full(&self) -> bool {
110        self.inner.len() >= self.limit
111    }
112}
113
114impl<T> IntoIterator for LimitedBuffer<T> {
115    type Item = T;
116    type IntoIter = <Vec<T> as IntoIterator>::IntoIter;
117
118    fn into_iter(self) -> Self::IntoIter {
119        self.inner.into_iter()
120    }
121}
122
123impl<T: Send> rayon::slice::ParallelSliceMut<T> for LimitedBuffer<T> {
124    fn as_parallel_slice_mut(&mut self) -> &mut [T] {
125        self.inner.as_mut_slice()
126    }
127}
128
129#[cfg(test)]
130mod test {
131    use super::{ChunkBuffer, ChunkBufferBuilder, LimitedBufferBuilder};
132
133    #[test]
134    fn test_limited_buffer() {
135        let builder = LimitedBufferBuilder::new(2, true);
136        let mut buffer = builder.build();
137
138        buffer.push(0);
139        assert_eq!(buffer.is_full(), false);
140        buffer.push(1);
141        assert_eq!(buffer.is_full(), true);
142
143        let data = Vec::from_iter(buffer);
144        assert_eq!(data, vec![0, 1]);
145    }
146}
147
148#[cfg(feature = "memory-limit")]
149pub mod mem {
150    use deepsize;
151    use rayon;
152
153    use super::{ChunkBuffer, ChunkBufferBuilder};
154
155    /// [`MemoryLimitedBuffer`] builder.
156    pub struct MemoryLimitedBufferBuilder {
157        buffer_limit: u64,
158    }
159
160    impl MemoryLimitedBufferBuilder {
161        /// Creates a new instance of a builder.
162        ///
163        /// # Arguments
164        /// * `buffer_limit` - Buffer size limit in bytes
165        pub fn new(buffer_limit: u64) -> Self {
166            MemoryLimitedBufferBuilder { buffer_limit }
167        }
168    }
169
170    impl<T: Send> ChunkBufferBuilder<T> for MemoryLimitedBufferBuilder
171    where
172        T: deepsize::DeepSizeOf,
173    {
174        type Buffer = MemoryLimitedBuffer<T>;
175
176        fn build(&self) -> Self::Buffer {
177            MemoryLimitedBuffer::new(self.buffer_limit)
178        }
179    }
180
181    impl Default for MemoryLimitedBufferBuilder {
182        fn default() -> Self {
183            MemoryLimitedBufferBuilder { buffer_limit: u64::MAX }
184        }
185    }
186
187    /// Buffer limited by consumed memory.
188    pub struct MemoryLimitedBuffer<T> {
189        limit: u64,
190        current_size: u64,
191        inner: Vec<T>,
192    }
193
194    impl<T> MemoryLimitedBuffer<T> {
195        /// Creates a new instance of a buffer.
196        ///
197        /// # Arguments
198        /// * `limit` - Buffer size limit in bytes
199        pub fn new(limit: u64) -> Self {
200            MemoryLimitedBuffer {
201                limit,
202                current_size: 0,
203                inner: Vec::new(),
204            }
205        }
206
207        /// Returns buffer size in bytes.
208        pub fn mem_size(&self) -> u64 {
209            self.current_size
210        }
211    }
212
213    impl<T: Send> ChunkBuffer<T> for MemoryLimitedBuffer<T>
214    where
215        T: deepsize::DeepSizeOf,
216    {
217        fn push(&mut self, item: T) {
218            self.current_size += item.deep_size_of() as u64;
219            self.inner.push(item);
220        }
221
222        fn len(&self) -> usize {
223            self.inner.len()
224        }
225
226        fn is_full(&self) -> bool {
227            self.current_size >= self.limit
228        }
229    }
230
231    impl<T> IntoIterator for MemoryLimitedBuffer<T> {
232        type Item = T;
233        type IntoIter = <Vec<T> as IntoIterator>::IntoIter;
234
235        fn into_iter(self) -> Self::IntoIter {
236            self.inner.into_iter()
237        }
238    }
239
240    impl<T: Send> rayon::slice::ParallelSliceMut<T> for MemoryLimitedBuffer<T> {
241        fn as_parallel_slice_mut(&mut self) -> &mut [T] {
242            self.inner.as_mut_slice()
243        }
244    }
245
246    #[cfg(test)]
247    mod test {
248        use deepsize;
249
250        use super::{ChunkBuffer, ChunkBufferBuilder, MemoryLimitedBufferBuilder};
251
252        #[derive(Debug, Clone, PartialEq, Eq, deepsize::DeepSizeOf)]
253        struct MyType {
254            number: i64,
255            string: String,
256        }
257
258        #[test]
259        fn test_memory_limited_buffer() {
260            let builder = MemoryLimitedBufferBuilder::new(76);
261            let mut buffer = builder.build();
262
263            let item1 = MyType {
264                number: 0,               // 8 bytes
265                string: "hello!".into(), // 8 + 8 + 8 + 6 = 30 bytes
266            };
267            buffer.push(item1.clone());
268            assert_eq!(buffer.mem_size(), 38);
269            assert_eq!(buffer.is_full(), false);
270
271            let item2 = MyType {
272                number: 1,               // 8 bytes
273                string: "world!".into(), // 8 + 8 + 8 + 6 = 30 bytes
274            };
275            buffer.push(item2.clone());
276            assert_eq!(buffer.mem_size(), 76);
277            assert_eq!(buffer.is_full(), true);
278
279            let actual_data = Vec::from_iter(buffer);
280            let expected_data = vec![item1, item2];
281            assert_eq!(actual_data, expected_data);
282        }
283    }
284}