swapvec/swapvec.rs
1use std::{
2 collections::VecDeque,
3 fmt::Debug,
4 fs::File,
5};
6
7use serde::{Deserialize, Serialize};
8
9use crate::{
10 checkedfile::BatchWriter,
11 compression::{Compress, CompressBoxedClone},
12 error::SwapVecError,
13 swapveciter::SwapVecIter,
14};
15
16/// Set compression level of the compression
17/// algorithm. This maps to different values
18/// depending on the chosen algortihm.
19#[derive(Clone, Debug, Copy)]
20pub enum CompressionLevel {
21 /// Slower than default, higher compression.
22 /// Might be useful for big amount of data
23 /// which requires heavier compression.
24 Slow,
25 /// A good ratio of compression ratio to cpu time.
26 Default,
27 /// Accept worse compression for speed.
28 /// Useful for easily compressable data with
29 /// many repetitions.
30 Fast,
31}
32
33/// Configure compression for the temporary
34/// file into which your data might be swapped out.
35#[derive(Debug)]
36#[non_exhaustive]
37pub enum Compression {
38 /// Read more about LZ4 here: [LZ4]
39 /// [LZ4]: https://github.com/lz4/lz4
40 Lz4,
41 /// Deflate, mostly known from gzip.
42 Deflate(CompressionLevel),
43 /// Provide your own compression algortihm by implementing
44 /// `Compress`.
45 Custom(Box<dyn CompressBoxedClone>),
46}
47
48impl Clone for Compression {
49 fn clone(&self) -> Self {
50 match &self {
51 Self::Lz4 => Self::Lz4,
52 Self::Deflate(n) => Self::Deflate(*n),
53 Self::Custom(x) => Self::Custom(x.boxed_clone()),
54 }
55 }
56}
57
58/// Configure when and how the vector should swap.
59///
60/// The file creation will happen after max(swap_after, batch_size)
61/// elements.
62///
63/// Keep in mind, that if the temporary file exists,
64/// after ever batch_size elements, at least one write (syscall)
65/// will happen.
66#[derive(Debug)]
67pub struct SwapVecConfig {
68 /// The vector will create a temporary file and starting to
69 /// swap after so many elements.
70 /// If your elements have a certain size in bytes, you can
71 /// multiply this value to calculate the required storage.
72 ///
73 /// If you want to start swapping with the first batch,
74 /// set to batch_size or smaller.
75 ///
76 /// Default: 32 * 1024 * 1024
77 pub swap_after: usize,
78 /// How many elements at once should be written to disk.
79 /// Keep in mind, that for every batch one hash (`u64`)
80 /// and one bytecount (`usize`)
81 /// will be kept in memory.
82 ///
83 /// One batch write will result in at least one syscall.
84 ///
85 /// Default: 32 * 1024
86 pub batch_size: usize,
87 /// If and how you want to compress your temporary file.
88 /// This might be only useful for data which is compressable,
89 /// like timeseries often are.
90 ///
91 /// Default: No compression
92 pub compression: Option<Compression>,
93}
94
95impl Default for SwapVecConfig {
96 fn default() -> Self {
97 Self {
98 swap_after: 32 * 1024 * 1024,
99 batch_size: 32 * 1024,
100 compression: None,
101 }
102 }
103}
104
105/// An only growing array type
106/// which swaps to disk, based on it's initial configuration.
107///
108/// Create a mutable instance, and then
109/// pass iterators or elements to grow it.
110/// ```rust
111/// let mut bigvec = swapvec::SwapVec::default();
112/// let iterator = (0..9);
113/// bigvec.consume(iterator);
114/// bigvec.push(99);
115/// let new_iterator = bigvec.into_iter();
116/// ```
117pub struct SwapVec<T>
118where
119 for<'a> T: Serialize + Deserialize<'a>,
120{
121 tempfile: Option<BatchWriter<File>>,
122 vector: VecDeque<T>,
123 config: SwapVecConfig,
124}
125
126impl<T: Serialize + for<'a> Deserialize<'a>> Default for SwapVec<T> {
127 fn default() -> Self {
128 Self {
129 tempfile: None,
130 vector: VecDeque::new(),
131 config: SwapVecConfig::default(),
132 }
133 }
134}
135
136impl<T: Serialize + for<'a> Deserialize<'a>> Debug for SwapVec<T> {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 write!(
139 f,
140 "SwapVec {{elements_in_ram: {}, elements_in_file: {}}}",
141 self.vector.len(),
142 self.tempfile.as_ref().map(|x| x.batch_count()).unwrap_or(0) * self.config.batch_size,
143 )
144 }
145}
146
147impl<T> SwapVec<T>
148where
149 for<'a> T: Serialize + Deserialize<'a> + Clone,
150{
151 /// Intialize with non-default configuration.
152 pub fn with_config(config: SwapVecConfig) -> Self {
153 Self {
154 tempfile: None,
155 vector: VecDeque::new(),
156 config,
157 }
158 }
159
160 /// Give away an entire iterator for consumption.
161 /// Might return an error, due to possibly triggered batch flush (IO).
162 pub fn consume(&mut self, it: impl Iterator<Item = T>) -> Result<(), SwapVecError> {
163 for element in it {
164 self.push(element)?;
165 self.after_push_work()?;
166 }
167 Ok(())
168 }
169
170 /// Push a single element.
171 /// Might return an error, due to possibly triggered batch flush (IO).
172 /// Will write at most one batch per insert.
173 /// If `swap_after` is bigger than `batch_size` and a file is created,
174 /// every insert will
175 /// write one batch to disk, until the elements in memory have a count
176 /// smaller than or equal to batch size.
177 pub fn push(&mut self, element: T) -> Result<(), SwapVecError> {
178 self.vector.push_back(element);
179 self.after_push_work()
180 }
181
182 /// Check if enough items have been pushed so that
183 /// the temporary file has been created.
184 /// Will be false if element count is below swap_after and below batch_size
185 pub fn written_to_file(&self) -> bool {
186 self.tempfile.is_some()
187 }
188
189 /// Get the file size in bytes of the temporary file.
190 /// Might do IO and therefore could return some Result.
191 pub fn file_size(&self) -> Option<usize> {
192 self.tempfile.as_ref().map(|f| f.bytes_written())
193 }
194
195 /// Basically int(elements pushed / batch size)
196 pub fn batches_written(&self) -> usize {
197 match self.tempfile.as_ref() {
198 None => 0,
199 Some(f) => f.batch_count(),
200 }
201 }
202
203 fn after_push_work(&mut self) -> Result<(), SwapVecError> {
204 if self.vector.len() <= self.config.batch_size {
205 return Ok(());
206 }
207 if self.tempfile.is_none() && self.vector.len() <= self.config.swap_after {
208 return Ok(());
209 }
210
211 // Flush batch
212 if self.tempfile.is_none() {
213 let tf = tempfile::Builder::new().tempfile_in(".")?.into_file();
214 self.tempfile = Some(BatchWriter::new(tf));
215 }
216 assert!(self.tempfile.is_some());
217 let batch: Vec<_> = self.vector.drain(0..self.config.batch_size).collect();
218
219 let buffer = bincode::serialize(&batch)?;
220 let compressed = self.config.compression.compress(buffer);
221 self.tempfile.as_mut().unwrap().write_batch(&compressed)?;
222 Ok(())
223 }
224}
225
226impl<T: Serialize + for<'a> Deserialize<'a> + Clone> IntoIterator for SwapVec<T> {
227 type Item = Result<T, SwapVecError>;
228 type IntoIter = SwapVecIter<T>;
229
230 fn into_iter(self) -> Self::IntoIter {
231 SwapVecIter::new(self.tempfile, self.vector, self.config)
232 }
233}