1use anyhow::{bail, ensure};
4
5use std::{
6 borrow::{Borrow, Cow},
7 collections::HashSet,
8 fmt,
9 iter::Peekable,
10};
11
12use crate::{
13 access::{Access, AccessExt, RawAccess, RawAccessMut},
14 indexes::{Entries, IndexIterator},
15 BinaryKey, BinaryValue, Entry,
16};
17
18#[derive(PartialEq)]
20enum IteratorPosition<K: BinaryKey + ?Sized> {
21 NextKey(K::Owned),
23 Ended,
25}
26
27impl<K> fmt::Debug for IteratorPosition<K>
28where
29 K: BinaryKey + fmt::Debug + ?Sized,
30{
31 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 Self::NextKey(key) => {
34 let key_ref: &K = key.borrow();
35 formatter.debug_tuple("NextKey").field(&key_ref).finish()
36 }
37 Self::Ended => formatter.debug_tuple("Ended").finish(),
38 }
39 }
40}
41
42impl<K> BinaryValue for IteratorPosition<K>
43where
44 K: BinaryKey + ?Sized,
45{
46 fn to_bytes(&self) -> Vec<u8> {
47 match self {
48 Self::NextKey(key) => {
49 let key: &K = key.borrow();
50 let mut buffer = vec![0; 1 + key.size()];
51 key.write(&mut buffer[1..]);
52 buffer
53 }
54 Self::Ended => vec![1],
55 }
56 }
57
58 fn from_bytes(bytes: Cow<'_, [u8]>) -> anyhow::Result<Self> {
59 ensure!(
60 !bytes.is_empty(),
61 "`IteratorPosition` serialization cannot be empty"
62 );
63 Ok(match bytes[0] {
64 0 => Self::NextKey(K::read(&bytes[1..])),
65 1 => Self::Ended,
66 _ => bail!("Invalid `IteratorPosition` discriminant"),
67 })
68 }
69}
70
71pub struct PersistentIter<'a, T: RawAccess, I: IndexIterator> {
124 inner: Inner<'a, T, I>,
125}
126
127impl<T, I> fmt::Debug for PersistentIter<'_, T, I>
128where
129 T: RawAccess,
130 I: IndexIterator,
131 I::Key: fmt::Debug,
132{
133 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
134 formatter
135 .debug_struct("PersistentIter")
136 .field("inner", &self.inner)
137 .finish()
138 }
139}
140
141enum Inner<'a, T: RawAccess, I: IndexIterator> {
143 Active {
146 iter: Peekable<Entries<'a, I::Key, I::Value>>,
147 position_entry: Entry<T, IteratorPosition<I::Key>>,
148 },
149 Ended,
151}
152
153impl<T, I> fmt::Debug for Inner<'_, T, I>
154where
155 T: RawAccess,
156 I: IndexIterator,
157 I::Key: fmt::Debug,
158{
159 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
160 match self {
161 Inner::Active { position_entry, .. } => formatter
162 .debug_struct("Active")
163 .field("position", &position_entry.get())
164 .finish(),
165 Inner::Ended => formatter.debug_tuple("Ended").finish(),
166 }
167 }
168}
169
170impl<'a, T, I> PersistentIter<'a, T, I>
171where
172 T: RawAccessMut,
173 I: IndexIterator,
174{
175 pub fn new<A>(access: &A, name: &str, index: &'a I) -> Self
177 where
178 A: Access<Base = T>,
179 {
180 let position_entry: Entry<_, IteratorPosition<I::Key>> = access.get_entry(name);
181 let position = position_entry.get();
182
183 let start_key = match position {
184 None => None,
185 Some(IteratorPosition::NextKey(key)) => Some(key),
186 Some(IteratorPosition::Ended) => {
187 return Self {
188 inner: Inner::Ended,
189 };
190 }
191 };
192
193 Self {
194 inner: Inner::Active {
195 iter: index
196 .index_iter(start_key.as_ref().map(Borrow::borrow))
197 .peekable(),
198 position_entry,
199 },
200 }
201 }
202
203 pub fn skip_values(self) -> PersistentKeys<'a, T, I> {
205 PersistentKeys { base_iter: self }
206 }
207}
208
209impl<T, I> Iterator for PersistentIter<'_, T, I>
210where
211 T: RawAccessMut,
212 I: IndexIterator,
213{
214 type Item = (<I::Key as ToOwned>::Owned, I::Value);
215
216 fn next(&mut self) -> Option<Self::Item> {
217 if let Inner::Active {
218 ref mut iter,
219 ref mut position_entry,
220 } = self.inner
221 {
222 let next = iter.next();
223 if next.is_some() {
224 position_entry.set(if let Some((key, _)) = iter.peek() {
225 IteratorPosition::NextKey(key.borrow().to_owned())
227 } else {
228 IteratorPosition::Ended
229 });
230 } else {
231 position_entry.set(IteratorPosition::Ended);
232 self.inner = Inner::Ended;
233 }
234 next
235 } else {
236 None
237 }
238 }
239}
240
241pub struct PersistentKeys<'a, T: RawAccess, I: IndexIterator> {
248 base_iter: PersistentIter<'a, T, I>,
249}
250
251impl<'a, T, I> PersistentKeys<'a, T, I>
252where
253 T: RawAccessMut,
254 I: IndexIterator,
255{
256 pub fn new<A>(access: &A, name: &str, index: &'a I) -> Self
258 where
259 A: Access<Base = T>,
260 {
261 PersistentIter::new(access, name, index).skip_values()
262 }
263}
264
265impl<T, I> fmt::Debug for PersistentKeys<'_, T, I>
266where
267 T: RawAccess,
268 I: IndexIterator,
269 I::Key: fmt::Debug,
270{
271 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
272 formatter
273 .debug_struct("PersistentIter")
274 .field("inner", &self.base_iter.inner)
275 .finish()
276 }
277}
278
279impl<T, I> Iterator for PersistentKeys<'_, T, I>
280where
281 T: RawAccessMut,
282 I: IndexIterator,
283{
284 type Item = <I::Key as ToOwned>::Owned;
285
286 fn next(&mut self) -> Option<Self::Item> {
287 self.base_iter.next().map(|(key, _)| key)
288 }
289}
290
291#[derive(Debug)]
293pub struct PersistentIters<T> {
294 access: T,
295 names: HashSet<String>,
296}
297
298impl<T> PersistentIters<T>
299where
300 T: Access,
301 T::Base: RawAccessMut,
302{
303 pub fn new(access: T) -> Self {
305 Self {
306 access,
307 names: HashSet::new(),
308 }
309 }
310
311 pub fn create<'a, I: IndexIterator>(
313 &mut self,
314 name: &str,
315 index: &'a I,
316 ) -> PersistentIter<'a, T::Base, I> {
317 self.names.insert(name.to_owned());
318 PersistentIter::new(&self.access, name, index)
319 }
320
321 pub(super) fn all_ended(&self) -> bool {
326 for name in &self.names {
327 let pos = self
328 .access
329 .clone()
330 .get_entry::<_, IteratorPosition<()>>(name.as_str())
331 .get();
332 if pos != Some(IteratorPosition::Ended) {
333 return false;
334 }
335 }
336 true
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use super::{AccessExt, IteratorPosition, PersistentIter, PersistentKeys};
343 use crate::{access::CopyAccessExt, migration::Scratchpad, Database, MapIndex, TemporaryDB};
344
345 #[test]
346 fn persistent_iter_for_map() {
347 let db = TemporaryDB::new();
348 let fork = db.fork();
349 let mut map = fork.get_map("map");
350 for i in 0_u32..10 {
351 map.put(&i, i.to_string());
352 }
353
354 let scratchpad = Scratchpad::new("iter", &fork);
355 let iter = PersistentIter::new(&scratchpad, "map", &map);
356 let mut count = 0;
357 for (i, (key, value)) in iter.take(5).enumerate() {
358 assert_eq!(key, i as u32);
359 assert_eq!(value, i.to_string());
360 count += 1;
361 }
362 assert_eq!(count, 5);
363 {
364 let position_entry = scratchpad.get_entry::<_, IteratorPosition<u32>>("map");
365 assert_eq!(position_entry.get(), Some(IteratorPosition::NextKey(5)));
366 }
367
368 let iter = PersistentIter::new(&scratchpad, "map", &map);
370 count = 0;
371 for (i, (key, value)) in (5..).zip(iter) {
372 assert_eq!(key, i as u32);
373 assert_eq!(value, i.to_string());
374 count += 1;
375 }
376 assert_eq!(count, 5);
377 {
378 let position_entry = scratchpad.get_entry::<_, IteratorPosition<u32>>("map");
379 assert_eq!(position_entry.get(), Some(IteratorPosition::Ended));
380 }
381
382 let iter = PersistentIter::new(&scratchpad, "map", &map);
384 assert_eq!(iter.count(), 0);
385 }
386
387 #[test]
388 fn persistent_iter_with_unsized_keys() {
389 let db = TemporaryDB::new();
390 let fork = db.fork();
391 let mut map: MapIndex<_, str, u64> = fork.get_map("map");
392 let words = ["How", "many", "letters", "are", "in", "this", "word", "?"];
393 for &word in &words {
394 map.put(word, word.len() as u64);
395 }
396
397 let scratchpad = Scratchpad::new("iter", &fork);
398 let iter = PersistentIter::new(&scratchpad, "map", &map);
399 for (word, size) in iter.take_while(|(word, _)| word.as_str() < "many") {
400 assert!(words.contains(&word.as_str()));
401 assert_eq!(word.len() as u64, size);
402 }
403
404 {
405 let position_entry = scratchpad.get_entry::<_, IteratorPosition<str>>("map");
406 let expected_pos = IteratorPosition::NextKey("this".to_owned());
409 assert_eq!(position_entry.get(), Some(expected_pos));
410 }
411
412 let iter = PersistentIter::new(&scratchpad, "map", &map);
413 assert_eq!(
414 iter.collect::<Vec<_>>(),
415 vec![("this".to_owned(), 4), ("word".to_owned(), 4)]
416 );
417 }
418
419 #[test]
420 fn persistent_iter_for_list() {
421 let db = TemporaryDB::new();
422 let fork = db.fork();
423 let mut list = fork.get_list("list");
424 list.extend((0_u32..10).map(|i| i.to_string()));
425
426 let scratchpad = Scratchpad::new("iter", &fork);
427 let iter = PersistentIter::new(&scratchpad, "list", &list);
428 let items: Vec<_> = iter.take(5).filter(|(i, _)| i % 2 == 1).collect();
430 assert_eq!(items, vec![(1, "1".to_owned()), (3, "3".to_owned())]);
431
432 {
433 let position_entry = scratchpad.get_entry::<_, IteratorPosition<u64>>("list");
434 assert_eq!(position_entry.get(), Some(IteratorPosition::NextKey(5)));
435 }
436
437 let iter = PersistentIter::new(&scratchpad, "list", &list);
438 for (i, value) in iter.take(3) {
439 assert_eq!(i.to_string(), value);
440 }
441
442 {
443 let position_entry = scratchpad.get_entry::<_, IteratorPosition<u64>>("list");
444 assert_eq!(position_entry.get(), Some(IteratorPosition::NextKey(8)));
445 }
446
447 let iter = PersistentIter::new(&scratchpad, "list", &list);
448 assert_eq!(iter.count(), 2);
449 }
450
451 #[test]
452 fn empty_persistent_iter() {
453 let db = TemporaryDB::new();
454 let fork = db.fork();
455 let list = fork.get_list::<_, String>("list");
456
457 let scratchpad = Scratchpad::new("iter", &fork);
458 let iter = PersistentIter::new(&scratchpad, "list", &list);
459 assert_eq!(iter.count(), 0);
460 let position_entry = scratchpad.get_entry::<_, IteratorPosition<u64>>("list");
461 assert_eq!(position_entry.get(), Some(IteratorPosition::Ended));
462 }
463
464 #[test]
465 fn persistent_iter_for_sparse_list() {
466 let db = TemporaryDB::new();
467 let fork = db.fork();
468 let mut list = fork.get_sparse_list("list");
469 for &i in &[0, 1, 2, 3, 5, 8, 13, 21] {
470 list.set(i, i.to_string());
471 }
472
473 let scratchpad = Scratchpad::new("iter", &fork);
474 let iter = PersistentIter::new(&scratchpad, "list", &list);
475 let mut count = 0;
476 for (i, value) in iter.take(5) {
477 assert_eq!(value, i.to_string());
478 count += 1;
479 }
480 assert_eq!(count, 5);
481 {
482 let position_entry = scratchpad.get_entry::<_, IteratorPosition<u64>>("list");
483 assert_eq!(position_entry.get(), Some(IteratorPosition::NextKey(8)));
484 }
485
486 let iter = PersistentIter::new(&scratchpad, "list", &list);
487 let indexes: Vec<_> = iter.map(|(i, _)| i).collect();
488 assert_eq!(indexes, vec![8, 13, 21]);
489 }
490
491 #[test]
492 fn persistent_iter_for_key_set() {
493 let db = TemporaryDB::new();
494 let fork = db.fork();
495 let mut set = fork.get_key_set("set");
496 for i in &[0_u16, 1, 2, 3, 5, 8, 13, 21] {
497 set.insert(i);
498 }
499
500 let scratchpad = Scratchpad::new("iter", &fork);
501 let iter = PersistentKeys::new(&scratchpad, "set", &set);
502 let head: Vec<_> = iter.take(3).collect();
503 assert_eq!(head, vec![0, 1, 2]);
504
505 {
506 let mut iter = PersistentKeys::new(&scratchpad, "set", &set);
507 assert_eq!(iter.nth(2), Some(8));
508 }
509 {
510 let position_entry = scratchpad.get_entry::<_, IteratorPosition<u16>>("set");
511 assert_eq!(position_entry.get(), Some(IteratorPosition::NextKey(13)));
512 }
513
514 let iter = PersistentKeys::new(&scratchpad, "set", &set);
515 let tail: Vec<_> = iter.collect();
516 assert_eq!(tail, vec![13, 21]);
517 }
518}