swapvec/swapvec.rs
1use std::{collections::VecDeque, fmt::Debug, fs::File};
2
3use serde::{Deserialize, Serialize};
4
5use crate::{
6 checkedfile::BatchWriter,
7 compression::{Compress, CompressBoxedClone},
8 error::SwapVecError,
9 swapveciter::SwapVecIter,
10};
11
12/// Set compression level of the compression
13/// algorithm. This maps to different values
14/// depending on the chosen algortihm.
15#[derive(Clone, Debug, Copy)]
16pub enum CompressionLevel {
17 /// Slower than default, higher compression.
18 /// Might be useful for big amount of data
19 /// which requires heavier compression.
20 Slow,
21 /// A good ratio of compression ratio to cpu time.
22 Default,
23 /// Accept worse compression for speed.
24 /// Useful for easily compressable data with
25 /// many repetitions.
26 Fast,
27}
28
29/// Configure compression for the temporary
30/// file into which your data might be swapped out.
31#[derive(Debug)]
32#[non_exhaustive]
33pub enum Compression {
34 /// Read more about LZ4 here: [LZ4]
35 /// [LZ4]: https://github.com/lz4/lz4
36 Lz4,
37 /// Deflate, mostly known from gzip.
38 Deflate(CompressionLevel),
39 /// Provide your own compression algortihm by implementing
40 /// `Compress`.
41 Custom(Box<dyn CompressBoxedClone + Send>),
42}
43
44impl Clone for Compression {
45 fn clone(&self) -> Self {
46 match &self {
47 Self::Lz4 => Self::Lz4,
48 Self::Deflate(n) => Self::Deflate(*n),
49 Self::Custom(x) => Self::Custom(x.boxed_clone()),
50 }
51 }
52}
53
54/// Configure when and how the vector should swap.
55///
56/// The file creation will happen after max(swap_after, batch_size)
57/// elements.
58///
59/// Keep in mind, that if the temporary file exists,
60/// after ever batch_size elements, at least one write (syscall)
61/// will happen.
62#[derive(Debug)]
63pub struct SwapVecConfig {
64 /// The vector will create a temporary file and starting to
65 /// swap after so many elements.
66 /// If your elements have a certain size in bytes, you can
67 /// multiply this value to calculate the required storage.
68 ///
69 /// If you want to start swapping with the first batch,
70 /// set to batch_size or smaller.
71 ///
72 /// Default: 32 * 1024 * 1024
73 pub swap_after: usize,
74 /// How many elements at once should be written to disk.
75 /// Keep in mind, that for every batch one hash (`u64`)
76 /// and one bytecount (`usize`)
77 /// will be kept in memory.
78 ///
79 /// One batch write will result in at least one syscall.
80 ///
81 /// Default: 32 * 1024
82 pub batch_size: usize,
83 /// If and how you want to compress your temporary file.
84 /// This might be only useful for data which is compressable,
85 /// like timeseries often are.
86 ///
87 /// Default: No compression
88 pub compression: Option<Compression>,
89}
90
91impl Default for SwapVecConfig {
92 fn default() -> Self {
93 Self {
94 swap_after: 32 * 1024 * 1024,
95 batch_size: 32 * 1024,
96 compression: None,
97 }
98 }
99}
100
101/// An only growing array type
102/// which swaps to disk, based on it's initial configuration.
103///
104/// Create a mutable instance, and then
105/// pass iterators or elements to grow it.
106/// ```rust
107/// let mut bigvec = swapvec::SwapVec::default();
108/// let iterator = (0..9);
109/// bigvec.consume(iterator);
110/// bigvec.push(99);
111/// let new_iterator = bigvec.into_iter();
112/// ```
113pub struct SwapVec<T>
114where
115 for<'a> T: Serialize + Deserialize<'a>,
116{
117 tempfile: Option<BatchWriter<File>>,
118 vector: VecDeque<T>,
119 config: SwapVecConfig,
120}
121
122impl<T: Serialize + for<'a> Deserialize<'a>> Default for SwapVec<T> {
123 fn default() -> Self {
124 Self {
125 tempfile: None,
126 vector: VecDeque::new(),
127 config: SwapVecConfig::default(),
128 }
129 }
130}
131
132impl<T: Serialize + for<'a> Deserialize<'a>> Debug for SwapVec<T> {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 write!(
135 f,
136 "SwapVec {{elements_in_ram: {}, elements_in_file: {}}}",
137 self.vector.len(),
138 self.tempfile.as_ref().map(|x| x.batch_count()).unwrap_or(0) * self.config.batch_size,
139 )
140 }
141}
142
143impl<T> SwapVec<T>
144where
145 for<'a> T: Serialize + Deserialize<'a> + Clone,
146{
147 /// Intialize with non-default configuration.
148 pub fn with_config(config: SwapVecConfig) -> Self {
149 Self {
150 tempfile: None,
151 vector: VecDeque::new(),
152 config,
153 }
154 }
155
156 /// Give away an entire iterator for consumption.
157 /// Might return an error, due to possibly triggered batch flush (IO).
158 pub fn consume(&mut self, it: impl Iterator<Item = T>) -> Result<(), SwapVecError> {
159 for element in it {
160 self.push(element)?;
161 self.after_push_work()?;
162 }
163 Ok(())
164 }
165
166 /// Push a single element.
167 /// Might return an error, due to possibly triggered batch flush (IO).
168 /// Will write at most one batch per insert.
169 /// If `swap_after` is bigger than `batch_size` and a file is created,
170 /// every insert will
171 /// write one batch to disk, until the elements in memory have a count
172 /// smaller than or equal to batch size.
173 pub fn push(&mut self, element: T) -> Result<(), SwapVecError> {
174 self.vector.push_back(element);
175 self.after_push_work()
176 }
177
178 /// Check if enough items have been pushed so that
179 /// the temporary file has been created.
180 /// Will be false if element count is below swap_after and below batch_size
181 pub fn written_to_file(&self) -> bool {
182 self.tempfile.is_some()
183 }
184
185 /// Get the file size in bytes of the temporary file.
186 /// Might do IO and therefore could return some Result.
187 pub fn file_size(&self) -> Option<usize> {
188 self.tempfile.as_ref().map(|f| f.bytes_written())
189 }
190
191 /// Basically int(elements pushed / batch size)
192 pub fn batches_written(&self) -> usize {
193 match self.tempfile.as_ref() {
194 None => 0,
195 Some(f) => f.batch_count(),
196 }
197 }
198
199 fn after_push_work(&mut self) -> Result<(), SwapVecError> {
200 if self.vector.len() <= self.config.batch_size {
201 return Ok(());
202 }
203 if self.tempfile.is_none() && self.vector.len() <= self.config.swap_after {
204 return Ok(());
205 }
206
207 // Flush batch
208 if self.tempfile.is_none() {
209 let tf = tempfile::Builder::new().tempfile_in(".")?.into_file();
210 self.tempfile = Some(BatchWriter::new(tf));
211 }
212 assert!(self.tempfile.is_some());
213 let batch: Vec<_> = self.vector.drain(0..self.config.batch_size).collect();
214
215 let buffer = bincode::serialize(&batch)?;
216 let compressed = self.config.compression.compress(buffer);
217 self.tempfile.as_mut().unwrap().write_batch(&compressed)?;
218 Ok(())
219 }
220}
221
222impl<T: Serialize + for<'a> Deserialize<'a> + Clone> IntoIterator for SwapVec<T> {
223 type Item = Result<T, SwapVecError>;
224 type IntoIter = SwapVecIter<T>;
225
226 fn into_iter(self) -> Self::IntoIter {
227 SwapVecIter::new(self.tempfile, self.vector, self.config)
228 }
229}