swapvec/
swapvec.rs

1use std::{collections::VecDeque, fmt::Debug, fs::File};
2
3use serde::{Deserialize, Serialize};
4
5use crate::{
6    checkedfile::BatchWriter,
7    compression::{Compress, CompressBoxedClone},
8    error::SwapVecError,
9    swapveciter::SwapVecIter,
10};
11
12/// Set compression level of the compression
13/// algorithm. This maps to different values
14/// depending on the chosen algortihm.
15#[derive(Clone, Debug, Copy)]
16pub enum CompressionLevel {
17    /// Slower than default, higher compression.
18    /// Might be useful for big amount of data
19    /// which requires heavier compression.
20    Slow,
21    /// A good ratio of compression ratio to cpu time.
22    Default,
23    /// Accept worse compression for speed.
24    /// Useful for easily compressable data with
25    /// many repetitions.
26    Fast,
27}
28
29/// Configure compression for the temporary
30/// file into which your data might be swapped out.  
31#[derive(Debug)]
32#[non_exhaustive]
33pub enum Compression {
34    /// Read more about LZ4 here: [LZ4]
35    /// [LZ4]: https://github.com/lz4/lz4
36    Lz4,
37    /// Deflate, mostly known from gzip.
38    Deflate(CompressionLevel),
39    /// Provide your own compression algortihm by implementing
40    /// `Compress`.
41    Custom(Box<dyn CompressBoxedClone + Send>),
42}
43
44impl Clone for Compression {
45    fn clone(&self) -> Self {
46        match &self {
47            Self::Lz4 => Self::Lz4,
48            Self::Deflate(n) => Self::Deflate(*n),
49            Self::Custom(x) => Self::Custom(x.boxed_clone()),
50        }
51    }
52}
53
54/// Configure when and how the vector should swap.
55///
56/// The file creation will happen after max(swap_after, batch_size)
57/// elements.
58///
59/// Keep in mind, that if the temporary file exists,
60/// after ever batch_size elements, at least one write (syscall)
61/// will happen.
62#[derive(Debug)]
63pub struct SwapVecConfig {
64    /// The vector will create a temporary file and starting to
65    /// swap after so many elements.
66    /// If your elements have a certain size in bytes, you can
67    /// multiply this value to calculate the required storage.
68    ///
69    /// If you want to start swapping with the first batch,
70    /// set to batch_size or smaller.
71    ///
72    /// Default: 32 * 1024 * 1024
73    pub swap_after: usize,
74    /// How many elements at once should be written to disk.  
75    /// Keep in mind, that for every batch one hash (`u64`)
76    /// and one bytecount (`usize`)
77    /// will be kept in memory.
78    ///
79    /// One batch write will result in at least one syscall.
80    ///
81    /// Default: 32 * 1024
82    pub batch_size: usize,
83    /// If and how you want to compress your temporary file.  
84    /// This might be only useful for data which is compressable,
85    /// like timeseries often are.
86    ///
87    /// Default: No compression
88    pub compression: Option<Compression>,
89}
90
91impl Default for SwapVecConfig {
92    fn default() -> Self {
93        Self {
94            swap_after: 32 * 1024 * 1024,
95            batch_size: 32 * 1024,
96            compression: None,
97        }
98    }
99}
100
101/// An only growing array type
102/// which swaps to disk, based on it's initial configuration.
103///
104/// Create a mutable instance, and then
105/// pass iterators or elements to grow it.
106/// ```rust
107/// let mut bigvec = swapvec::SwapVec::default();
108/// let iterator = (0..9);
109/// bigvec.consume(iterator);
110/// bigvec.push(99);
111/// let new_iterator = bigvec.into_iter();
112/// ```
113pub struct SwapVec<T>
114where
115    for<'a> T: Serialize + Deserialize<'a>,
116{
117    tempfile: Option<BatchWriter<File>>,
118    vector: VecDeque<T>,
119    config: SwapVecConfig,
120}
121
122impl<T: Serialize + for<'a> Deserialize<'a>> Default for SwapVec<T> {
123    fn default() -> Self {
124        Self {
125            tempfile: None,
126            vector: VecDeque::new(),
127            config: SwapVecConfig::default(),
128        }
129    }
130}
131
132impl<T: Serialize + for<'a> Deserialize<'a>> Debug for SwapVec<T> {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        write!(
135            f,
136            "SwapVec {{elements_in_ram: {}, elements_in_file: {}}}",
137            self.vector.len(),
138            self.tempfile.as_ref().map(|x| x.batch_count()).unwrap_or(0) * self.config.batch_size,
139        )
140    }
141}
142
143impl<T> SwapVec<T>
144where
145    for<'a> T: Serialize + Deserialize<'a> + Clone,
146{
147    /// Intialize with non-default configuration.
148    pub fn with_config(config: SwapVecConfig) -> Self {
149        Self {
150            tempfile: None,
151            vector: VecDeque::new(),
152            config,
153        }
154    }
155
156    /// Give away an entire iterator for consumption.  
157    /// Might return an error, due to possibly triggered batch flush (IO).
158    pub fn consume(&mut self, it: impl Iterator<Item = T>) -> Result<(), SwapVecError> {
159        for element in it {
160            self.push(element)?;
161            self.after_push_work()?;
162        }
163        Ok(())
164    }
165
166    /// Push a single element.
167    /// Might return an error, due to possibly triggered batch flush (IO).
168    /// Will write at most one batch per insert.
169    /// If `swap_after` is bigger than `batch_size` and a file is created,
170    /// every insert will
171    /// write one batch to disk, until the elements in memory have a count
172    /// smaller than or equal to batch size.
173    pub fn push(&mut self, element: T) -> Result<(), SwapVecError> {
174        self.vector.push_back(element);
175        self.after_push_work()
176    }
177
178    /// Check if enough items have been pushed so that
179    /// the temporary file has been created.  
180    /// Will be false if element count is below swap_after and below batch_size
181    pub fn written_to_file(&self) -> bool {
182        self.tempfile.is_some()
183    }
184
185    /// Get the file size in bytes of the temporary file.
186    /// Might do IO and therefore could return some Result.
187    pub fn file_size(&self) -> Option<usize> {
188        self.tempfile.as_ref().map(|f| f.bytes_written())
189    }
190
191    /// Basically int(elements pushed / batch size)
192    pub fn batches_written(&self) -> usize {
193        match self.tempfile.as_ref() {
194            None => 0,
195            Some(f) => f.batch_count(),
196        }
197    }
198
199    fn after_push_work(&mut self) -> Result<(), SwapVecError> {
200        if self.vector.len() <= self.config.batch_size {
201            return Ok(());
202        }
203        if self.tempfile.is_none() && self.vector.len() <= self.config.swap_after {
204            return Ok(());
205        }
206
207        // Flush batch
208        if self.tempfile.is_none() {
209            let tf = tempfile::Builder::new().tempfile_in(".")?.into_file();
210            self.tempfile = Some(BatchWriter::new(tf));
211        }
212        assert!(self.tempfile.is_some());
213        let batch: Vec<_> = self.vector.drain(0..self.config.batch_size).collect();
214
215        let buffer = bincode::serialize(&batch)?;
216        let compressed = self.config.compression.compress(buffer);
217        self.tempfile.as_mut().unwrap().write_batch(&compressed)?;
218        Ok(())
219    }
220}
221
222impl<T: Serialize + for<'a> Deserialize<'a> + Clone> IntoIterator for SwapVec<T> {
223    type Item = Result<T, SwapVecError>;
224    type IntoIter = SwapVecIter<T>;
225
226    fn into_iter(self) -> Self::IntoIter {
227        SwapVecIter::new(self.tempfile, self.vector, self.config)
228    }
229}