swapvec/
swapvec.rs

1use std::{
2    collections::VecDeque,
3    fmt::Debug,
4    fs::File,
5};
6
7use serde::{Deserialize, Serialize};
8
9use crate::{
10    checkedfile::BatchWriter,
11    compression::{Compress, CompressBoxedClone},
12    error::SwapVecError,
13    swapveciter::SwapVecIter,
14};
15
16/// Set compression level of the compression
17/// algorithm. This maps to different values
18/// depending on the chosen algortihm.
19#[derive(Clone, Debug, Copy)]
20pub enum CompressionLevel {
21    /// Slower than default, higher compression.
22    /// Might be useful for big amount of data
23    /// which requires heavier compression.
24    Slow,
25    /// A good ratio of compression ratio to cpu time.
26    Default,
27    /// Accept worse compression for speed.
28    /// Useful for easily compressable data with
29    /// many repetitions.
30    Fast,
31}
32
33/// Configure compression for the temporary
34/// file into which your data might be swapped out.  
35#[derive(Debug)]
36#[non_exhaustive]
37pub enum Compression {
38    /// Read more about LZ4 here: [LZ4]
39    /// [LZ4]: https://github.com/lz4/lz4
40    Lz4,
41    /// Deflate, mostly known from gzip.
42    Deflate(CompressionLevel),
43    /// Provide your own compression algortihm by implementing
44    /// `Compress`.
45    Custom(Box<dyn CompressBoxedClone>),
46}
47
48impl Clone for Compression {
49    fn clone(&self) -> Self {
50        match &self {
51            Self::Lz4 => Self::Lz4,
52            Self::Deflate(n) => Self::Deflate(*n),
53            Self::Custom(x) => Self::Custom(x.boxed_clone()),
54        }
55    }
56}
57
58/// Configure when and how the vector should swap.
59///
60/// The file creation will happen after max(swap_after, batch_size)
61/// elements.
62///
63/// Keep in mind, that if the temporary file exists,
64/// after ever batch_size elements, at least one write (syscall)
65/// will happen.
66#[derive(Debug)]
67pub struct SwapVecConfig {
68    /// The vector will create a temporary file and starting to
69    /// swap after so many elements.
70    /// If your elements have a certain size in bytes, you can
71    /// multiply this value to calculate the required storage.
72    ///
73    /// If you want to start swapping with the first batch,
74    /// set to batch_size or smaller.
75    ///
76    /// Default: 32 * 1024 * 1024
77    pub swap_after: usize,
78    /// How many elements at once should be written to disk.  
79    /// Keep in mind, that for every batch one hash (`u64`)
80    /// and one bytecount (`usize`)
81    /// will be kept in memory.
82    ///
83    /// One batch write will result in at least one syscall.
84    ///
85    /// Default: 32 * 1024
86    pub batch_size: usize,
87    /// If and how you want to compress your temporary file.  
88    /// This might be only useful for data which is compressable,
89    /// like timeseries often are.
90    ///
91    /// Default: No compression
92    pub compression: Option<Compression>,
93}
94
95impl Default for SwapVecConfig {
96    fn default() -> Self {
97        Self {
98            swap_after: 32 * 1024 * 1024,
99            batch_size: 32 * 1024,
100            compression: None,
101        }
102    }
103}
104
105/// An only growing array type
106/// which swaps to disk, based on it's initial configuration.
107///
108/// Create a mutable instance, and then
109/// pass iterators or elements to grow it.
110/// ```rust
111/// let mut bigvec = swapvec::SwapVec::default();
112/// let iterator = (0..9);
113/// bigvec.consume(iterator);
114/// bigvec.push(99);
115/// let new_iterator = bigvec.into_iter();
116/// ```
117pub struct SwapVec<T>
118where
119    for<'a> T: Serialize + Deserialize<'a>,
120{
121    tempfile: Option<BatchWriter<File>>,
122    vector: VecDeque<T>,
123    config: SwapVecConfig,
124}
125
126impl<T: Serialize + for<'a> Deserialize<'a>> Default for SwapVec<T> {
127    fn default() -> Self {
128        Self {
129            tempfile: None,
130            vector: VecDeque::new(),
131            config: SwapVecConfig::default(),
132        }
133    }
134}
135
136impl<T: Serialize + for<'a> Deserialize<'a>> Debug for SwapVec<T> {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        write!(
139            f,
140            "SwapVec {{elements_in_ram: {}, elements_in_file: {}}}",
141            self.vector.len(),
142            self.tempfile.as_ref().map(|x| x.batch_count()).unwrap_or(0) * self.config.batch_size,
143        )
144    }
145}
146
147impl<T> SwapVec<T>
148where
149    for<'a> T: Serialize + Deserialize<'a> + Clone,
150{
151    /// Intialize with non-default configuration.
152    pub fn with_config(config: SwapVecConfig) -> Self {
153        Self {
154            tempfile: None,
155            vector: VecDeque::new(),
156            config,
157        }
158    }
159
160    /// Give away an entire iterator for consumption.  
161    /// Might return an error, due to possibly triggered batch flush (IO).
162    pub fn consume(&mut self, it: impl Iterator<Item = T>) -> Result<(), SwapVecError> {
163        for element in it {
164            self.push(element)?;
165            self.after_push_work()?;
166        }
167        Ok(())
168    }
169
170    /// Push a single element.
171    /// Might return an error, due to possibly triggered batch flush (IO).
172    /// Will write at most one batch per insert.
173    /// If `swap_after` is bigger than `batch_size` and a file is created,
174    /// every insert will
175    /// write one batch to disk, until the elements in memory have a count
176    /// smaller than or equal to batch size.
177    pub fn push(&mut self, element: T) -> Result<(), SwapVecError> {
178        self.vector.push_back(element);
179        self.after_push_work()
180    }
181
182    /// Check if enough items have been pushed so that
183    /// the temporary file has been created.  
184    /// Will be false if element count is below swap_after and below batch_size
185    pub fn written_to_file(&self) -> bool {
186        self.tempfile.is_some()
187    }
188
189    /// Get the file size in bytes of the temporary file.
190    /// Might do IO and therefore could return some Result.
191    pub fn file_size(&self) -> Option<usize> {
192        self.tempfile.as_ref().map(|f| f.bytes_written())
193    }
194
195    /// Basically int(elements pushed / batch size)
196    pub fn batches_written(&self) -> usize {
197        match self.tempfile.as_ref() {
198            None => 0,
199            Some(f) => f.batch_count(),
200        }
201    }
202
203    fn after_push_work(&mut self) -> Result<(), SwapVecError> {
204        if self.vector.len() <= self.config.batch_size {
205            return Ok(());
206        }
207        if self.tempfile.is_none() && self.vector.len() <= self.config.swap_after {
208            return Ok(());
209        }
210
211        // Flush batch
212        if self.tempfile.is_none() {
213            let tf = tempfile::Builder::new().tempfile_in(".")?.into_file();
214            self.tempfile = Some(BatchWriter::new(tf));
215        }
216        assert!(self.tempfile.is_some());
217        let batch: Vec<_> = self.vector.drain(0..self.config.batch_size).collect();
218
219        let buffer = bincode::serialize(&batch)?;
220        let compressed = self.config.compression.compress(buffer);
221        self.tempfile.as_mut().unwrap().write_batch(&compressed)?;
222        Ok(())
223    }
224}
225
226impl<T: Serialize + for<'a> Deserialize<'a> + Clone> IntoIterator for SwapVec<T> {
227    type Item = Result<T, SwapVecError>;
228    type IntoIter = SwapVecIter<T>;
229
230    fn into_iter(self) -> Self::IntoIter {
231        SwapVecIter::new(self.tempfile, self.vector, self.config)
232    }
233}