swapvec/
swapveciter.rs

1use std::collections::VecDeque;
2use std::fs::File;
3
4use serde::{Deserialize, Serialize};
5
6use crate::checkedfile::{BatchReader, BatchWriter};
7use crate::compression::Compress;
8use crate::error::SwapVecError;
9use crate::swapvec::SwapVecConfig;
10
11struct VecDequeIndex<T: Clone> {
12    value: VecDeque<T>,
13}
14
15impl<T: Clone> From<VecDeque<T>> for VecDequeIndex<T> {
16    fn from(value: VecDeque<T>) -> Self {
17        Self { value }
18    }
19}
20
21impl<T: Clone> VecDequeIndex<T> {
22    fn get(&self, i: usize) -> Option<T> {
23        let (a, b) = self.value.as_slices();
24        if i < a.len() {
25            a.get(i).cloned()
26        } else {
27            b.get(i - a.len()).cloned()
28        }
29    }
30}
31
32/// Iterator for SwapVec.
33///
34/// Items might be read from disk,
35/// so every item is wrapped in a `Result`.  
36/// The iterator aborts after the first error.
37///
38/// Dropping the iterator removes the temporary file, if existing.  
39/// Also quitting the program should remove the temporary file.
40pub struct SwapVecIter<T>
41where
42    for<'a> T: Serialize + Deserialize<'a> + Clone,
43{
44    // Do not error on new, because into_iter()
45    // is not allowed to fail. Fail at first try then.
46    new_error: Option<std::io::Error>,
47    current_batch_rev: Vec<T>,
48    tempfile: Option<BatchReader<File>>,
49    // last_elements are elements,
50    // which have not been written to disk.
51    // Therefore, for iterating from zero,
52    // first read elements from disk and
53    // then from last_elements.
54    last_elements: VecDequeIndex<T>,
55    last_elements_index: usize,
56    config: SwapVecConfig,
57}
58
59impl<T: Serialize + for<'a> Deserialize<'a> + Clone> SwapVecIter<T> {
60    pub(crate) fn new(
61        tempfile_written: Option<BatchWriter<File>>,
62        last_elements: VecDeque<T>,
63        config: SwapVecConfig,
64    ) -> Self {
65        let (tempfile, new_error) = match tempfile_written.map(|v| v.try_into()) {
66            None => (None, None),
67            Some(Ok(v)) => (Some(v), None),
68            Some(Err(e)) => (None, Some(e)),
69        };
70
71        let last_elements: VecDequeIndex<_> = last_elements.into();
72        Self {
73            new_error,
74            current_batch_rev: Vec::with_capacity(config.batch_size),
75            last_elements,
76            last_elements_index: 0,
77            tempfile,
78            config,
79        }
80    }
81
82    fn read_batch(&mut self) -> Result<Option<Vec<T>>, SwapVecError> {
83        if self.tempfile.is_none() {
84            return Ok(None);
85        }
86        assert!(self.tempfile.is_some());
87        if let Some(err) = self.new_error.take() {
88            return Err(err.into());
89        }
90
91        let tempfile = self.tempfile.as_mut().unwrap();
92        let buffer = tempfile.read_batch()?;
93        if buffer.is_none() {
94            return Ok(None);
95        }
96        let buffer = buffer.unwrap();
97        let decompressed: Vec<u8> = self
98            .config
99            .compression
100            .decompress(buffer.to_vec())
101            .map_err(|_| SwapVecError::Decompression)?;
102
103        let batch: Vec<T> = bincode::deserialize(&decompressed)?;
104
105        Ok(Some(batch))
106    }
107
108    fn next_in_batch(&mut self) -> Result<Option<T>, SwapVecError> {
109        if let Some(v) = self.current_batch_rev.pop() {
110            return Ok(Some(v));
111        }
112        if let Some(mut new_batch) = self.read_batch()? {
113            new_batch.reverse();
114            self.current_batch_rev = new_batch;
115            Ok(self.current_batch_rev.pop())
116        } else {
117            Ok(None)
118        }
119    }
120
121    /// Resets the iteration, starting from the first element.
122    /// If a file exists, it will be read from the beginning.  
123    ///
124    /// To use this feature, you probably don't want to consume
125    /// the iterator (`bigvec.map(|x| x * 2)`), but to use
126    /// [`Iterator::by_ref()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.by_ref)
127    /// ```rust
128    /// let mut bigvec = swapvec::SwapVec::default();
129    /// bigvec.consume(0..99);
130    /// let mut new_iterator = bigvec.into_iter();
131    /// let sum: usize = new_iterator.by_ref().map(|v| v.unwrap()).sum();
132    /// new_iterator.reset();
133    /// let sum_double: usize = new_iterator.by_ref().map(|v| v.unwrap() * 2).sum();
134    /// ```
135    pub fn reset(&mut self) {
136        self.current_batch_rev.clear();
137        self.last_elements_index = 0;
138        if let Some(tempfile) = self.tempfile.as_mut() {
139            if let Err(e) = tempfile.reset() {
140                self.new_error = Some(e);
141            }
142        }
143    }
144}
145
146impl<T: Serialize + for<'a> Deserialize<'a> + Clone> Iterator for SwapVecIter<T> {
147    type Item = Result<T, SwapVecError>;
148
149    fn next(&mut self) -> Option<Self::Item> {
150        if let Some(item) = self.current_batch_rev.pop() {
151            return Some(Ok(item));
152        }
153
154        match self.next_in_batch() {
155            Err(err) => Some(Err(err)),
156            Ok(Some(item)) => Some(Ok(item)),
157            Ok(None) => {
158                let index = self.last_elements_index;
159                self.last_elements_index += 1;
160                self.last_elements.get(index).map(|x| Ok(x))
161            }
162        }
163    }
164}