1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
use std::{
    collections::{hash_map::DefaultHasher, VecDeque},
    fmt::Debug,
    fs::File,
    hash::{Hash, Hasher},
    io::Write,
    os::fd::AsRawFd,
};

use serde::{Deserialize, Serialize};

use crate::{error::SwapVecError, swapveciter::SwapVecIter};

/// Configure compression for the temporary
/// file into which your data might be swapped out.  
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum Compression {
    /// Read more about LZ4 here: [LZ4]
    /// [LZ4]: https://github.com/lz4/lz4
    Lz4,
}

/// Configure when and how the vector should swap.
///
/// The file creation will happen after max(swap_after, batch_size)
/// elements.
///
/// Keep in mind, that if the temporary file exists,
/// after ever batch_size elements, at least one write (syscall)
/// will happen.
#[derive(Debug, Clone)]
pub struct SwapVecConfig {
    /// The vector will create a temporary file and starting to
    /// swap after so many elements.
    /// If your elements have a certain size in bytes, you can
    /// multiply this value to calculate the required storage.
    ///
    /// If you want to start swapping with the first batch,
    /// set to batch_size or smaller.
    ///
    /// Default: 32 * 1024 * 1024
    pub swap_after: usize,
    /// How many elements at once should be written to disk.  
    /// Keep in mind, that for every batch one hash (`u64`)
    /// and one bytecount (`usize`)
    /// will be kept in memory.
    ///
    /// One batch write will result in at least one syscall.
    ///
    /// Default: 32 * 1024
    pub batch_size: usize,
    /// If and how you want to compress your temporary file.  
    /// This might be only useful for data which is compressable,
    /// like timeseries often are.
    ///
    /// Default: No compression
    pub compression: Option<Compression>,
}

impl Default for SwapVecConfig {
    fn default() -> Self {
        Self {
            swap_after: 32 * 1024 * 1024,
            batch_size: 32 * 1024,
            compression: None,
        }
    }
}

pub struct BatchInfo {
    pub hash: u64,
    pub bytes: usize,
}

pub struct CheckedFile {
    pub file: File,
    pub batch_info: Vec<BatchInfo>,
}

impl CheckedFile {
    fn write_all(&mut self, buffer: &Vec<u8>) -> Result<(), std::io::Error> {
        let mut hasher = DefaultHasher::new();
        buffer.hash(&mut hasher);
        let _res = self.file.write_all(buffer);
        self.batch_info.push(BatchInfo {
            hash: hasher.finish(),
            bytes: buffer.len(),
        });
        self.file.flush()
    }
}

/// An only growing array type
/// which swaps to disk, based on it's initial configuration.
///
/// Create a mutable instance, and then
/// pass iterators or elements to grow it.
/// ```rust
/// let mut bigvec = swapvec::SwapVec::default();
/// let iterator = (0..9);
/// bigvec.consume(iterator);
/// bigvec.push(99);
/// let new_iterator = bigvec.into_iter();
/// ```
pub struct SwapVec<T>
where
    for<'a> T: Serialize + Deserialize<'a>,
{
    tempfile: Option<CheckedFile>,
    vector: VecDeque<T>,
    config: SwapVecConfig,
}

impl<T: Serialize + for<'a> Deserialize<'a>> Default for SwapVec<T> {
    fn default() -> Self {
        Self {
            tempfile: None,
            vector: VecDeque::new(),
            config: SwapVecConfig::default(),
        }
    }
}

impl<T: Serialize + for<'a> Deserialize<'a>> Debug for SwapVec<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "SwapVec {{elements_in_ram: {}, elements_in_file: {}, filedescriptor: {:#?}}}",
            self.vector.len(),
            self.tempfile
                .as_ref()
                .map(|x| x.batch_info.len())
                .unwrap_or(0)
                * self.config.batch_size,
            self.tempfile.as_ref().map(|x| x.file.as_raw_fd())
        )
    }
}

impl<T> SwapVec<T>
where
    for<'a> T: Serialize + Deserialize<'a> + Hash,
{
    /// Intialize with non-default configuration.
    pub fn with_config(config: SwapVecConfig) -> Self {
        Self {
            tempfile: None,
            vector: VecDeque::new(),
            config,
        }
    }

    /// Give away an entire iterator for consumption.  
    /// Might return an error, due to possibly triggered batch flush (IO).
    pub fn consume(&mut self, it: impl Iterator<Item = T>) -> Result<(), SwapVecError> {
        for element in it {
            self.push(element)?;
            self.after_push_work()?;
        }
        Ok(())
    }

    /// Push a single element  
    /// Might return an error, due to possibly triggered batch flush (IO).
    pub fn push(&mut self, element: T) -> Result<(), SwapVecError> {
        self.vector.push_back(element);
        self.after_push_work()
    }

    /// Check if a file has been created.  
    /// Is false if element count is below swap_after and below batch_size
    pub fn written_to_file(&self) -> bool {
        self.tempfile.is_some()
    }

    /// Get the file size in bytes of the temporary file.
    /// Might do IO and therefore could return some Result.
    pub fn file_size(&self) -> Option<Result<u64, SwapVecError>> {
        match self.tempfile.as_ref() {
            None => None,
            Some(f) => match f.file.metadata() {
                Err(err) => Some(Err(err.into())),
                Ok(meta) => Some(Ok(meta.len())),
            },
        }
    }

    /// Basically elements pushed // batch_size
    pub fn batches_written(&self) -> usize {
        match self.tempfile.as_ref() {
            None => 0,
            Some(f) => f.batch_info.len(),
        }
    }

    fn after_push_work(&mut self) -> Result<(), SwapVecError> {
        if self.vector.len() < self.config.batch_size {
            return Ok(());
        }
        if self.tempfile.is_none() && self.vector.len() < self.config.swap_after {
            return Ok(());
        }
        // Do action
        if self.tempfile.is_none() {
            let tf = tempfile::tempfile()?;
            self.tempfile = Some(CheckedFile {
                file: tf,
                batch_info: Vec::new(),
            })
        }

        let batch: Vec<T> = (0..self.config.batch_size)
            .map(|_| self.vector.pop_front().unwrap())
            .collect::<Vec<_>>();

        let mut batch_hash = DefaultHasher::new();
        batch.iter().for_each(|x| x.hash(&mut batch_hash));

        let buffer = bincode::serialize(&batch)?;
        self.tempfile.as_mut().unwrap().write_all(&buffer)?;
        Ok(())
    }
}

impl<T: Serialize + for<'a> Deserialize<'a> + Hash> IntoIterator for SwapVec<T> {
    type Item = Result<T, SwapVecError>;
    type IntoIter = SwapVecIter<T>;

    fn into_iter(self) -> Self::IntoIter {
        SwapVecIter::new(self.tempfile, self.vector, self.config)
    }
}