bitkv_rs/
iterator.rs

1use bytes::Bytes;
2use parking_lot::RwLock;
3use std::sync::Arc;
4
5use crate::{db::Engine, errors::Result, index::IndexIterator, option::IteratorOptions};
6
7/// Iterator interface
8pub struct Iterator<'a> {
9  index_iter: Arc<RwLock<Box<dyn IndexIterator>>>, // index iterator
10  engine: &'a Engine,
11}
12
13impl Engine {
14  /// Create a new iterator
15  pub fn iter(&self, options: IteratorOptions) -> Iterator {
16    Iterator {
17      index_iter: Arc::new(RwLock::new(self.index.iterator(options))),
18      engine: self,
19    }
20  }
21
22  /// list all keys in db
23  pub fn list_keys(&self) -> Result<Vec<Bytes>> {
24    self.index.list_keys()
25  }
26
27  /// operate on all key-value pairs in db, finish when `f` returns false
28  pub fn fold<F>(&self, f: F) -> Result<()>
29  where
30    Self: Sized,
31    F: Fn(Bytes, Bytes) -> bool,
32  {
33    let iter = self.iter(IteratorOptions::default());
34    while let Some((key, value)) = iter.next() {
35      if !f(key, value) {
36        break;
37      }
38    }
39    Ok(())
40  }
41}
42
43impl Iterator<'_> {
44  // `Rewind` go back to the beginning of the iterator
45  pub fn rewind(&self) {
46    let mut index_iter = self.index_iter.write();
47    index_iter.rewind();
48  }
49
50  // `Seek` search for the first entry with a key greater than or equal to the given key
51  pub fn seek(&self, key: Vec<u8>) {
52    let mut index_iter = self.index_iter.write();
53    index_iter.seek(key);
54  }
55
56  // `Next` move to the next entry, when the iterator is exhausted, return None
57  pub fn next(&self) -> Option<(Bytes, Bytes)> {
58    let mut index_iter = self.index_iter.write();
59    if let Some(item) = index_iter.next() {
60      let val = self
61        .engine
62        .get_value_by_position(item.1)
63        .expect("failed to get value from data file");
64      return Some((Bytes::from(item.0.to_vec()), val));
65    }
66    None
67  }
68}
69
70#[cfg(test)]
71mod tests {
72  use std::path::PathBuf;
73
74  use crate::{option::Options, util};
75
76  use super::*;
77
78  #[test]
79  fn test_iterator_fold() {
80    let mut opt = Options::default();
81    opt.dir_path = PathBuf::from("/tmp/bitkv-rs-iter-fold");
82    opt.data_file_size = 64 * 1024 * 1024; // 64MB
83    let engine = Engine::open(opt.clone()).expect("fail to open engine");
84
85    let put_res1 = engine.put(
86      Bytes::from("eecc".as_bytes().to_vec()),
87      util::rand_kv::get_test_value(10),
88    );
89    assert!(put_res1.is_ok());
90    let put_res2 = engine.put(
91      Bytes::from("aade".as_bytes().to_vec()),
92      util::rand_kv::get_test_value(11),
93    );
94    assert!(put_res2.is_ok());
95    let put_res3 = engine.put(
96      Bytes::from("ddce".as_bytes().to_vec()),
97      util::rand_kv::get_test_value(12),
98    );
99    assert!(put_res3.is_ok());
100    let put_res4 = engine.put(
101      Bytes::from("bbcc".as_bytes().to_vec()),
102      util::rand_kv::get_test_value(13),
103    );
104    assert!(put_res4.is_ok());
105
106    engine
107      .fold(|key, value| {
108        assert!(key.len() > 0);
109        assert!(value.len() > 0);
110        true
111      })
112      .unwrap();
113
114    // delete tested files
115    std::fs::remove_dir_all(opt.clone().dir_path).expect("failed to remove dir");
116  }
117
118  #[test]
119  fn test_iterator_list_keys() {
120    let mut opt = Options::default();
121    opt.dir_path = PathBuf::from("/tmp/bitkv-rs-iter-list_keys");
122    opt.data_file_size = 64 * 1024 * 1024; // 64MB
123    let engine = Engine::open(opt.clone()).expect("fail to open engine");
124
125    let keys1 = engine.list_keys();
126    assert_eq!(keys1.ok().unwrap().len(), 0);
127
128    let put_res1 = engine.put(
129      Bytes::from("aaccc".as_bytes().to_vec()),
130      util::rand_kv::get_test_value(10),
131    );
132    assert!(put_res1.is_ok());
133    let put_res2 = engine.put(
134      Bytes::from("eecc".as_bytes().to_vec()),
135      util::rand_kv::get_test_value(11),
136    );
137    assert!(put_res2.is_ok());
138    let put_res3 = engine.put(
139      Bytes::from("bbac".as_bytes().to_vec()),
140      util::rand_kv::get_test_value(12),
141    );
142    assert!(put_res3.is_ok());
143    let put_res4 = engine.put(
144      Bytes::from("ccde".as_bytes().to_vec()),
145      util::rand_kv::get_test_value(13),
146    );
147    assert!(put_res4.is_ok());
148
149    let keys2 = engine.list_keys();
150    assert_eq!(keys2.ok().unwrap().len(), 4);
151
152    // delete tested files
153    std::fs::remove_dir_all(opt.clone().dir_path).expect("failed to remove dir");
154  }
155
156  #[test]
157  fn test_iterator_seek() {
158    let mut opt = Options::default();
159    opt.dir_path = PathBuf::from("/tmp/bitkv-rs-iter-seek");
160    opt.data_file_size = 64 * 1024 * 1024; // 64MB
161    let engine = Engine::open(opt.clone()).expect("fail to open engine");
162
163    // no items
164    let iter1 = engine.iter(IteratorOptions::default());
165    iter1.seek("aa".as_bytes().to_vec());
166    assert!(iter1.next().is_none());
167
168    // put one item
169    let put_res1 = engine.put(
170      Bytes::from("aaccc".as_bytes().to_vec()),
171      util::rand_kv::get_test_value(10),
172    );
173    assert!(put_res1.is_ok());
174    let iter2 = engine.iter(IteratorOptions::default());
175    iter2.seek("a".as_bytes().to_vec());
176    assert!(iter2.next().is_some());
177
178    // multiple items
179
180    let put_res2 = engine.put(
181      Bytes::from("eecc".as_bytes().to_vec()),
182      util::rand_kv::get_test_value(11),
183    );
184    assert!(put_res2.is_ok());
185    let put_res3 = engine.put(
186      Bytes::from("bbac".as_bytes().to_vec()),
187      util::rand_kv::get_test_value(12),
188    );
189    assert!(put_res3.is_ok());
190    let put_res4 = engine.put(
191      Bytes::from("ccde".as_bytes().to_vec()),
192      util::rand_kv::get_test_value(13),
193    );
194    assert!(put_res4.is_ok());
195
196    let iter3 = engine.iter(IteratorOptions::default());
197    iter3.seek("a".as_bytes().to_vec());
198    assert_eq!(Bytes::from("aaccc"), iter3.next().unwrap().0);
199
200    // delete tested files
201    std::fs::remove_dir_all(opt.clone().dir_path).expect("failed to remove dir");
202  }
203
204  #[test]
205  fn test_iterator_next() {
206    let mut opt = Options::default();
207    opt.dir_path = PathBuf::from("/tmp/bitkv-rs-iter-next");
208    opt.data_file_size = 64 * 1024 * 1024; // 64MB
209    let engine = Engine::open(opt.clone()).expect("fail to open engine");
210
211    // put one item
212    let put_res1 = engine.put(
213      Bytes::from("eecc".as_bytes().to_vec()),
214      util::rand_kv::get_test_value(10),
215    );
216    assert!(put_res1.is_ok());
217    let iter1 = engine.iter(IteratorOptions::default());
218    iter1.seek("a".as_bytes().to_vec());
219    assert!(iter1.next().is_some());
220    iter1.rewind();
221    assert!(iter1.next().is_some());
222    assert!(iter1.next().is_none());
223
224    // multiple items
225    let put_res2 = engine.put(
226      Bytes::from("aade".as_bytes().to_vec()),
227      util::rand_kv::get_test_value(11),
228    );
229    assert!(put_res2.is_ok());
230    let put_res3 = engine.put(
231      Bytes::from("ddce".as_bytes().to_vec()),
232      util::rand_kv::get_test_value(12),
233    );
234    assert!(put_res3.is_ok());
235    let put_res4 = engine.put(
236      Bytes::from("bbcc".as_bytes().to_vec()),
237      util::rand_kv::get_test_value(13),
238    );
239    assert!(put_res4.is_ok());
240
241    let mut iter_opt = IteratorOptions::default();
242    iter_opt.reverse = true;
243    let iter2 = engine.iter(iter_opt);
244    while let Some(item) = iter2.next() {
245      assert!(item.0.len() > 0);
246    }
247
248    // delete tested files
249    std::fs::remove_dir_all(opt.clone().dir_path).expect("failed to remove dir");
250  }
251
252  #[test]
253  fn test_iterator_prefix() {
254    let mut opt = Options::default();
255    opt.dir_path = PathBuf::from("/tmp/bitkv-rs-iter-prefix");
256    opt.data_file_size = 64 * 1024 * 1024; // 64MB
257    let engine = Engine::open(opt.clone()).expect("fail to open engine");
258
259    let put_res1 = engine.put(
260      Bytes::from("eecc".as_bytes().to_vec()),
261      util::rand_kv::get_test_value(10),
262    );
263    assert!(put_res1.is_ok());
264    let put_res2 = engine.put(
265      Bytes::from("aade".as_bytes().to_vec()),
266      util::rand_kv::get_test_value(11),
267    );
268    assert!(put_res2.is_ok());
269    let put_res3 = engine.put(
270      Bytes::from("ddce".as_bytes().to_vec()),
271      util::rand_kv::get_test_value(12),
272    );
273    assert!(put_res3.is_ok());
274    let put_res4 = engine.put(
275      Bytes::from("bbcc".as_bytes().to_vec()),
276      util::rand_kv::get_test_value(13),
277    );
278    assert!(put_res4.is_ok());
279
280    let mut iter_opt = IteratorOptions::default();
281    iter_opt.prefix = "dd".as_bytes().to_vec();
282    let iter1 = engine.iter(iter_opt);
283    while let Some(item) = iter1.next() {
284      assert!(item.0.len() > 0);
285    }
286
287    // delete tested files
288    std::fs::remove_dir_all(opt.clone().dir_path).expect("failed to remove dir");
289  }
290}