metaldb/migration/
persistent_iter.rs

1//! Persistent iterators.
2
3use anyhow::{bail, ensure};
4
5use std::{
6    borrow::{Borrow, Cow},
7    collections::HashSet,
8    fmt,
9    iter::Peekable,
10};
11
12use crate::{
13    access::{Access, AccessExt, RawAccess, RawAccessMut},
14    indexes::{Entries, IndexIterator},
15    BinaryKey, BinaryValue, Entry,
16};
17
18/// Persistent iterator position.
19#[derive(PartialEq)]
20enum IteratorPosition<K: BinaryKey + ?Sized> {
21    /// There is a next key to start iteration from.
22    NextKey(K::Owned),
23    /// The iterator has ended.
24    Ended,
25}
26
27impl<K> fmt::Debug for IteratorPosition<K>
28where
29    K: BinaryKey + fmt::Debug + ?Sized,
30{
31    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            Self::NextKey(key) => {
34                let key_ref: &K = key.borrow();
35                formatter.debug_tuple("NextKey").field(&key_ref).finish()
36            }
37            Self::Ended => formatter.debug_tuple("Ended").finish(),
38        }
39    }
40}
41
42impl<K> BinaryValue for IteratorPosition<K>
43where
44    K: BinaryKey + ?Sized,
45{
46    fn to_bytes(&self) -> Vec<u8> {
47        match self {
48            Self::NextKey(key) => {
49                let key: &K = key.borrow();
50                let mut buffer = vec![0; 1 + key.size()];
51                key.write(&mut buffer[1..]);
52                buffer
53            }
54            Self::Ended => vec![1],
55        }
56    }
57
58    fn from_bytes(bytes: Cow<'_, [u8]>) -> anyhow::Result<Self> {
59        ensure!(
60            !bytes.is_empty(),
61            "`IteratorPosition` serialization cannot be empty"
62        );
63        Ok(match bytes[0] {
64            0 => Self::NextKey(K::read(&bytes[1..])),
65            1 => Self::Ended,
66            _ => bail!("Invalid `IteratorPosition` discriminant"),
67        })
68    }
69}
70
71/// Persistent iterator that stores its position in the database.
72///
73/// Persistent iterators iterate over an index and automatically persist iteration
74/// results in the DB. This allows to build fault-tolerant migration scripts that work correctly
75/// after being restarted while merging the intermediate changes to the database.
76///
77/// Like indexes, persistent iterators are identified by an address. Likewise, they are subject
78/// to the borrowing rules (e.g., attempting to create two instances of the same iterator will
79/// result in a runtime error). When migrating data, it makes sense to store iterators
80/// in the associated [`Scratchpad`]. In this way, iterators will be automatically removed
81/// when the migration is over.
82///
83/// # Examples
84///
85/// [`MigrationHelper`] offers convenient iterator API via `iter_loop` method, which covers
86/// basic use cases. When `iter_loop` is not enough, a persistent iterator can be instantiated
87/// independently:
88///
89/// ```
90/// # use metaldb::{access::{AccessExt, CopyAccessExt}, Database, TemporaryDB};
91/// # use metaldb::migration::{MigrationHelper, PersistentIter};
92/// let db = TemporaryDB::new();
93/// // Create data for migration.
94/// let fork = db.fork();
95/// fork.get_list("migration.list").extend((0..123).map(|i| i.to_string()));
96/// db.merge(fork.into_patch()).unwrap();
97///
98/// let helper = MigrationHelper::new(db, "migration");
99/// // The old data is here.
100/// let list = helper.old_data().get_list::<_, String>("list");
101/// // In the context of migration, persistent iterators should use
102/// // the scratchpad data access.
103/// let iter = PersistentIter::new(&helper.scratchpad(), "list_iter", &list);
104/// // Now, we can use `iter` as any other iterator. Persistence is most useful
105/// // together with the `take` adapter; it allows to break migrated data
106/// // into manageable chunks.
107/// for (_, item) in iter.take(100) {
108///     // Migrate `item`. The first component of a tuple is the index of the item
109///     // in the list, which we ignore.
110/// }
111///
112/// // If we recreate the iterator, it will resume iteration from the last
113/// // known position (the element with 0-based index 100, in our case).
114/// let mut iter = PersistentIter::new(&helper.scratchpad(), "list_iter", &list);
115/// let (i, item) = iter.next().unwrap();
116/// assert_eq!(i, 100);
117/// assert_eq!(item, "100");
118/// assert_eq!(iter.count(), 22); // number of remaining items
119/// ```
120///
121/// [`Scratchpad`]: struct.Scratchpad.html
122/// [`MigrationHelper`]: struct.MigrationHelper.html
123pub struct PersistentIter<'a, T: RawAccess, I: IndexIterator> {
124    inner: Inner<'a, T, I>,
125}
126
127impl<T, I> fmt::Debug for PersistentIter<'_, T, I>
128where
129    T: RawAccess,
130    I: IndexIterator,
131    I::Key: fmt::Debug,
132{
133    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
134        formatter
135            .debug_struct("PersistentIter")
136            .field("inner", &self.inner)
137            .finish()
138    }
139}
140
141/// Internal details of a persistent iterator.
142enum Inner<'a, T: RawAccess, I: IndexIterator> {
143    /// The iterator is active: it has an underlying iterator over a database object,
144    /// and an entry storing the iterator position.
145    Active {
146        iter: Peekable<Entries<'a, I::Key, I::Value>>,
147        position_entry: Entry<T, IteratorPosition<I::Key>>,
148    },
149    /// The iterator has ended.
150    Ended,
151}
152
153impl<T, I> fmt::Debug for Inner<'_, T, I>
154where
155    T: RawAccess,
156    I: IndexIterator,
157    I::Key: fmt::Debug,
158{
159    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
160        match self {
161            Inner::Active { position_entry, .. } => formatter
162                .debug_struct("Active")
163                .field("position", &position_entry.get())
164                .finish(),
165            Inner::Ended => formatter.debug_tuple("Ended").finish(),
166        }
167    }
168}
169
170impl<'a, T, I> PersistentIter<'a, T, I>
171where
172    T: RawAccessMut,
173    I: IndexIterator,
174{
175    /// Creates a new persistent iterator.
176    pub fn new<A>(access: &A, name: &str, index: &'a I) -> Self
177    where
178        A: Access<Base = T>,
179    {
180        let position_entry: Entry<_, IteratorPosition<I::Key>> = access.get_entry(name);
181        let position = position_entry.get();
182
183        let start_key = match position {
184            None => None,
185            Some(IteratorPosition::NextKey(key)) => Some(key),
186            Some(IteratorPosition::Ended) => {
187                return Self {
188                    inner: Inner::Ended,
189                };
190            }
191        };
192
193        Self {
194            inner: Inner::Active {
195                iter: index
196                    .index_iter(start_key.as_ref().map(Borrow::borrow))
197                    .peekable(),
198                position_entry,
199            },
200        }
201    }
202
203    /// Skips values in the iterator output without parsing them.
204    pub fn skip_values(self) -> PersistentKeys<'a, T, I> {
205        PersistentKeys { base_iter: self }
206    }
207}
208
209impl<T, I> Iterator for PersistentIter<'_, T, I>
210where
211    T: RawAccessMut,
212    I: IndexIterator,
213{
214    type Item = (<I::Key as ToOwned>::Owned, I::Value);
215
216    fn next(&mut self) -> Option<Self::Item> {
217        if let Inner::Active {
218            ref mut iter,
219            ref mut position_entry,
220        } = self.inner
221        {
222            let next = iter.next();
223            if next.is_some() {
224                position_entry.set(if let Some((key, _)) = iter.peek() {
225                    // Slightly clumsy way to clone the key.
226                    IteratorPosition::NextKey(key.borrow().to_owned())
227                } else {
228                    IteratorPosition::Ended
229                });
230            } else {
231                position_entry.set(IteratorPosition::Ended);
232                self.inner = Inner::Ended;
233            }
234            next
235        } else {
236            None
237        }
238    }
239}
240
241/// Persistent iterator over index keys that stores its position in the database.
242///
243/// This iterator can be used similarly to [`PersistentIter`]; the only difference is the
244/// type of items yielded by the iterator.
245///
246/// [`PersistentIter`]: struct.PersistentIter.html
247pub struct PersistentKeys<'a, T: RawAccess, I: IndexIterator> {
248    base_iter: PersistentIter<'a, T, I>,
249}
250
251impl<'a, T, I> PersistentKeys<'a, T, I>
252where
253    T: RawAccessMut,
254    I: IndexIterator,
255{
256    /// Creates a new persistent iterator.
257    pub fn new<A>(access: &A, name: &str, index: &'a I) -> Self
258    where
259        A: Access<Base = T>,
260    {
261        PersistentIter::new(access, name, index).skip_values()
262    }
263}
264
265impl<T, I> fmt::Debug for PersistentKeys<'_, T, I>
266where
267    T: RawAccess,
268    I: IndexIterator,
269    I::Key: fmt::Debug,
270{
271    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
272        formatter
273            .debug_struct("PersistentIter")
274            .field("inner", &self.base_iter.inner)
275            .finish()
276    }
277}
278
279impl<T, I> Iterator for PersistentKeys<'_, T, I>
280where
281    T: RawAccessMut,
282    I: IndexIterator,
283{
284    type Item = <I::Key as ToOwned>::Owned;
285
286    fn next(&mut self) -> Option<Self::Item> {
287        self.base_iter.next().map(|(key, _)| key)
288    }
289}
290
291/// Factory for persistent iterators.
292#[derive(Debug)]
293pub struct PersistentIters<T> {
294    access: T,
295    names: HashSet<String>,
296}
297
298impl<T> PersistentIters<T>
299where
300    T: Access,
301    T::Base: RawAccessMut,
302{
303    /// Creates a new factory.
304    pub fn new(access: T) -> Self {
305        Self {
306            access,
307            names: HashSet::new(),
308        }
309    }
310
311    /// Creates a persistent iterator identified by the `name`.
312    pub fn create<'a, I: IndexIterator>(
313        &mut self,
314        name: &str,
315        index: &'a I,
316    ) -> PersistentIter<'a, T::Base, I> {
317        self.names.insert(name.to_owned());
318        PersistentIter::new(&self.access, name, index)
319    }
320
321    /// Checks if all iterators instantiated via this instance have ended.
322    ///
323    /// This method will panic if any of iterators are borrowed and thus should only be called
324    /// when this is a priori not the case.
325    pub(super) fn all_ended(&self) -> bool {
326        for name in &self.names {
327            let pos = self
328                .access
329                .clone()
330                .get_entry::<_, IteratorPosition<()>>(name.as_str())
331                .get();
332            if pos != Some(IteratorPosition::Ended) {
333                return false;
334            }
335        }
336        true
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use super::{AccessExt, IteratorPosition, PersistentIter, PersistentKeys};
343    use crate::{access::CopyAccessExt, migration::Scratchpad, Database, MapIndex, TemporaryDB};
344
345    #[test]
346    fn persistent_iter_for_map() {
347        let db = TemporaryDB::new();
348        let fork = db.fork();
349        let mut map = fork.get_map("map");
350        for i in 0_u32..10 {
351            map.put(&i, i.to_string());
352        }
353
354        let scratchpad = Scratchpad::new("iter", &fork);
355        let iter = PersistentIter::new(&scratchpad, "map", &map);
356        let mut count = 0;
357        for (i, (key, value)) in iter.take(5).enumerate() {
358            assert_eq!(key, i as u32);
359            assert_eq!(value, i.to_string());
360            count += 1;
361        }
362        assert_eq!(count, 5);
363        {
364            let position_entry = scratchpad.get_entry::<_, IteratorPosition<u32>>("map");
365            assert_eq!(position_entry.get(), Some(IteratorPosition::NextKey(5)));
366        }
367
368        // Resume the iterator.
369        let iter = PersistentIter::new(&scratchpad, "map", &map);
370        count = 0;
371        for (i, (key, value)) in (5..).zip(iter) {
372            assert_eq!(key, i as u32);
373            assert_eq!(value, i.to_string());
374            count += 1;
375        }
376        assert_eq!(count, 5);
377        {
378            let position_entry = scratchpad.get_entry::<_, IteratorPosition<u32>>("map");
379            assert_eq!(position_entry.get(), Some(IteratorPosition::Ended));
380        }
381
382        // The iterator is ended now.
383        let iter = PersistentIter::new(&scratchpad, "map", &map);
384        assert_eq!(iter.count(), 0);
385    }
386
387    #[test]
388    fn persistent_iter_with_unsized_keys() {
389        let db = TemporaryDB::new();
390        let fork = db.fork();
391        let mut map: MapIndex<_, str, u64> = fork.get_map("map");
392        let words = ["How", "many", "letters", "are", "in", "this", "word", "?"];
393        for &word in &words {
394            map.put(word, word.len() as u64);
395        }
396
397        let scratchpad = Scratchpad::new("iter", &fork);
398        let iter = PersistentIter::new(&scratchpad, "map", &map);
399        for (word, size) in iter.take_while(|(word, _)| word.as_str() < "many") {
400            assert!(words.contains(&word.as_str()));
401            assert_eq!(word.len() as u64, size);
402        }
403
404        {
405            let position_entry = scratchpad.get_entry::<_, IteratorPosition<str>>("map");
406            // Note that `many` is not included into the values yielded by the iterator,
407            // but the iterator is advanced past it.
408            let expected_pos = IteratorPosition::NextKey("this".to_owned());
409            assert_eq!(position_entry.get(), Some(expected_pos));
410        }
411
412        let iter = PersistentIter::new(&scratchpad, "map", &map);
413        assert_eq!(
414            iter.collect::<Vec<_>>(),
415            vec![("this".to_owned(), 4), ("word".to_owned(), 4)]
416        );
417    }
418
419    #[test]
420    fn persistent_iter_for_list() {
421        let db = TemporaryDB::new();
422        let fork = db.fork();
423        let mut list = fork.get_list("list");
424        list.extend((0_u32..10).map(|i| i.to_string()));
425
426        let scratchpad = Scratchpad::new("iter", &fork);
427        let iter = PersistentIter::new(&scratchpad, "list", &list);
428        // Test that iterators work with adapters as expected.
429        let items: Vec<_> = iter.take(5).filter(|(i, _)| i % 2 == 1).collect();
430        assert_eq!(items, vec![(1, "1".to_owned()), (3, "3".to_owned())]);
431
432        {
433            let position_entry = scratchpad.get_entry::<_, IteratorPosition<u64>>("list");
434            assert_eq!(position_entry.get(), Some(IteratorPosition::NextKey(5)));
435        }
436
437        let iter = PersistentIter::new(&scratchpad, "list", &list);
438        for (i, value) in iter.take(3) {
439            assert_eq!(i.to_string(), value);
440        }
441
442        {
443            let position_entry = scratchpad.get_entry::<_, IteratorPosition<u64>>("list");
444            assert_eq!(position_entry.get(), Some(IteratorPosition::NextKey(8)));
445        }
446
447        let iter = PersistentIter::new(&scratchpad, "list", &list);
448        assert_eq!(iter.count(), 2);
449    }
450
451    #[test]
452    fn empty_persistent_iter() {
453        let db = TemporaryDB::new();
454        let fork = db.fork();
455        let list = fork.get_list::<_, String>("list");
456
457        let scratchpad = Scratchpad::new("iter", &fork);
458        let iter = PersistentIter::new(&scratchpad, "list", &list);
459        assert_eq!(iter.count(), 0);
460        let position_entry = scratchpad.get_entry::<_, IteratorPosition<u64>>("list");
461        assert_eq!(position_entry.get(), Some(IteratorPosition::Ended));
462    }
463
464    #[test]
465    fn persistent_iter_for_sparse_list() {
466        let db = TemporaryDB::new();
467        let fork = db.fork();
468        let mut list = fork.get_sparse_list("list");
469        for &i in &[0, 1, 2, 3, 5, 8, 13, 21] {
470            list.set(i, i.to_string());
471        }
472
473        let scratchpad = Scratchpad::new("iter", &fork);
474        let iter = PersistentIter::new(&scratchpad, "list", &list);
475        let mut count = 0;
476        for (i, value) in iter.take(5) {
477            assert_eq!(value, i.to_string());
478            count += 1;
479        }
480        assert_eq!(count, 5);
481        {
482            let position_entry = scratchpad.get_entry::<_, IteratorPosition<u64>>("list");
483            assert_eq!(position_entry.get(), Some(IteratorPosition::NextKey(8)));
484        }
485
486        let iter = PersistentIter::new(&scratchpad, "list", &list);
487        let indexes: Vec<_> = iter.map(|(i, _)| i).collect();
488        assert_eq!(indexes, vec![8, 13, 21]);
489    }
490
491    #[test]
492    fn persistent_iter_for_key_set() {
493        let db = TemporaryDB::new();
494        let fork = db.fork();
495        let mut set = fork.get_key_set("set");
496        for i in &[0_u16, 1, 2, 3, 5, 8, 13, 21] {
497            set.insert(i);
498        }
499
500        let scratchpad = Scratchpad::new("iter", &fork);
501        let iter = PersistentKeys::new(&scratchpad, "set", &set);
502        let head: Vec<_> = iter.take(3).collect();
503        assert_eq!(head, vec![0, 1, 2]);
504
505        {
506            let mut iter = PersistentKeys::new(&scratchpad, "set", &set);
507            assert_eq!(iter.nth(2), Some(8));
508        }
509        {
510            let position_entry = scratchpad.get_entry::<_, IteratorPosition<u16>>("set");
511            assert_eq!(position_entry.get(), Some(IteratorPosition::NextKey(13)));
512        }
513
514        let iter = PersistentKeys::new(&scratchpad, "set", &set);
515        let tail: Vec<_> = iter.collect();
516        assert_eq!(tail, vec![13, 21]);
517    }
518}