1use std::collections::VecDeque;
2use std::fs::File;
3
4use serde::{Deserialize, Serialize};
5
6use crate::checkedfile::{BatchReader, BatchWriter};
7use crate::compression::Compress;
8use crate::error::SwapVecError;
9use crate::swapvec::SwapVecConfig;
10
11struct VecDequeIndex<T: Clone> {
12 value: VecDeque<T>,
13}
14
15impl<T: Clone> From<VecDeque<T>> for VecDequeIndex<T> {
16 fn from(value: VecDeque<T>) -> Self {
17 Self { value }
18 }
19}
20
21impl<T: Clone> VecDequeIndex<T> {
22 fn get(&self, i: usize) -> Option<T> {
23 let (a, b) = self.value.as_slices();
24 if i < a.len() {
25 a.get(i).cloned()
26 } else {
27 b.get(i - a.len()).cloned()
28 }
29 }
30}
31
32pub struct SwapVecIter<T>
41where
42 for<'a> T: Serialize + Deserialize<'a> + Clone,
43{
44 new_error: Option<std::io::Error>,
47 current_batch_rev: Vec<T>,
48 tempfile: Option<BatchReader<File>>,
49 last_elements: VecDequeIndex<T>,
55 last_elements_index: usize,
56 config: SwapVecConfig,
57}
58
59impl<T: Serialize + for<'a> Deserialize<'a> + Clone> SwapVecIter<T> {
60 pub(crate) fn new(
61 tempfile_written: Option<BatchWriter<File>>,
62 last_elements: VecDeque<T>,
63 config: SwapVecConfig,
64 ) -> Self {
65 let (tempfile, new_error) = match tempfile_written.map(|v| v.try_into()) {
66 None => (None, None),
67 Some(Ok(v)) => (Some(v), None),
68 Some(Err(e)) => (None, Some(e)),
69 };
70
71 let last_elements: VecDequeIndex<_> = last_elements.into();
72 Self {
73 new_error,
74 current_batch_rev: Vec::with_capacity(config.batch_size),
75 last_elements,
76 last_elements_index: 0,
77 tempfile,
78 config,
79 }
80 }
81
82 fn read_batch(&mut self) -> Result<Option<Vec<T>>, SwapVecError> {
83 if self.tempfile.is_none() {
84 return Ok(None);
85 }
86 assert!(self.tempfile.is_some());
87 if let Some(err) = self.new_error.take() {
88 return Err(err.into());
89 }
90
91 let tempfile = self.tempfile.as_mut().unwrap();
92 let buffer = tempfile.read_batch()?;
93 if buffer.is_none() {
94 return Ok(None);
95 }
96 let buffer = buffer.unwrap();
97 let decompressed: Vec<u8> = self
98 .config
99 .compression
100 .decompress(buffer.to_vec())
101 .map_err(|_| SwapVecError::Decompression)?;
102
103 let batch: Vec<T> = bincode::deserialize(&decompressed)?;
104
105 Ok(Some(batch))
106 }
107
108 fn next_in_batch(&mut self) -> Result<Option<T>, SwapVecError> {
109 if let Some(v) = self.current_batch_rev.pop() {
110 return Ok(Some(v));
111 }
112 if let Some(mut new_batch) = self.read_batch()? {
113 new_batch.reverse();
114 self.current_batch_rev = new_batch;
115 Ok(self.current_batch_rev.pop())
116 } else {
117 Ok(None)
118 }
119 }
120
121 pub fn reset(&mut self) {
136 self.current_batch_rev.clear();
137 self.last_elements_index = 0;
138 if let Some(tempfile) = self.tempfile.as_mut() {
139 if let Err(e) = tempfile.reset() {
140 self.new_error = Some(e);
141 }
142 }
143 }
144}
145
146impl<T: Serialize + for<'a> Deserialize<'a> + Clone> Iterator for SwapVecIter<T> {
147 type Item = Result<T, SwapVecError>;
148
149 fn next(&mut self) -> Option<Self::Item> {
150 if let Some(item) = self.current_batch_rev.pop() {
151 return Some(Ok(item));
152 }
153
154 match self.next_in_batch() {
155 Err(err) => Some(Err(err)),
156 Ok(Some(item)) => Some(Ok(item)),
157 Ok(None) => {
158 let index = self.last_elements_index;
159 self.last_elements_index += 1;
160 self.last_elements.get(index).map(|x| Ok(x))
161 }
162 }
163 }
164}