graphannis_core/util/
disk_collections.rs

1use bincode::config::Options;
2use itertools::Itertools;
3use serde::de::DeserializeOwned;
4use serde::{Deserialize, Serialize};
5use sstable::{SSIterator, Table, TableBuilder, TableIterator};
6use transient_btree_index::{BtreeConfig, BtreeIndex};
7
8use crate::serializer::KeyVec;
9use crate::{errors::Result, serializer::KeySerializer};
10use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::iter::{FusedIterator, Peekable};
13use std::marker::PhantomData;
14use std::ops::{Bound, RangeBounds};
15use std::path::Path;
16
17const KB: usize = 1 << 10;
18pub const MB: usize = KB * KB;
19const BLOCK_MAX_SIZE: usize = 4 * KB;
20
21/// Uses a cache for each disk table with 8 MB capacity.
22pub const DEFAULT_BLOCK_CACHE_CAPACITY: usize = 8 * MB;
23
24#[derive(Serialize, Deserialize)]
25struct Entry<K, V>
26where
27    K: Ord,
28{
29    key: K,
30    value: V,
31}
32
33pub enum EvictionStrategy {
34    MaximumItems(usize),
35}
36
37impl Default for EvictionStrategy {
38    fn default() -> Self {
39        EvictionStrategy::MaximumItems(10_000)
40    }
41}
42
43pub struct DiskMap<K, V>
44where
45    K: 'static + KeySerializer + Serialize + DeserializeOwned + Clone + Send + Sync + Ord,
46    for<'de> V: 'static + Serialize + Deserialize<'de> + Clone + Send + Sync,
47{
48    eviction_strategy: EvictionStrategy,
49    block_cache_capacity: usize,
50    c0: BTreeMap<K, Option<V>>,
51    c1: Option<BtreeIndex<K, Option<V>>>,
52    c2: Option<Table>,
53    serialization: bincode::config::DefaultOptions,
54
55    c1_btree_config: BtreeConfig,
56}
57
58fn custom_options(block_cache_capacity: usize) -> sstable::Options {
59    let blocks = (block_cache_capacity / BLOCK_MAX_SIZE).max(1);
60    sstable::Options::default().with_cache_capacity(blocks)
61}
62
63impl<K, V> DiskMap<K, V>
64where
65    K: 'static + Clone + KeySerializer + Serialize + DeserializeOwned + Send + Sync + Ord,
66    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
67{
68    pub fn new(
69        persisted_file: Option<&Path>,
70        eviction_strategy: EvictionStrategy,
71        block_cache_capacity: usize,
72        c1_config: BtreeConfig,
73    ) -> Result<DiskMap<K, V>> {
74        let mut disk_table = None;
75
76        if let Some(persisted_file) = persisted_file
77            && persisted_file.is_file()
78        {
79            // Use existing file as read-only table which contains the whole map
80            let table = Table::new_from_file(custom_options(block_cache_capacity), persisted_file)?;
81            disk_table = Some(table);
82        }
83
84        Ok(DiskMap {
85            eviction_strategy,
86            block_cache_capacity,
87            c0: BTreeMap::default(),
88            c2: disk_table,
89            serialization: bincode::options(),
90            c1: None,
91            c1_btree_config: c1_config,
92        })
93    }
94
95    pub fn new_temporary(
96        eviction_strategy: EvictionStrategy,
97        block_cache_capacity: usize,
98        c1_config: BtreeConfig,
99    ) -> DiskMap<K, V> {
100        DiskMap {
101            eviction_strategy,
102            block_cache_capacity,
103            c0: BTreeMap::default(),
104            c2: None,
105            serialization: bincode::options(),
106            c1: None,
107            c1_btree_config: c1_config,
108        }
109    }
110
111    pub fn insert(&mut self, key: K, value: V) -> Result<()> {
112        self.c0.insert(key, Some(value));
113
114        self.evict_c0_if_necessary()?;
115
116        Ok(())
117    }
118
119    pub fn get(&self, key: &K) -> Result<Option<Cow<'_, V>>> {
120        // Check C0 first
121        if let Some(entry) = self.c0.get(key) {
122            if let Some(value) = entry {
123                return Ok(Some(Cow::Borrowed(value)));
124            } else {
125                // Value was explicitly deleted with a tombstone entry.
126                // Do not query C1 and C2.
127                return Ok(None);
128            }
129        }
130        // Check C1 (BTree disk index)
131        if let Some(c1) = &self.c1
132            && let Some(entry) = c1.get(key)?
133        {
134            if let Some(value) = entry {
135                return Ok(Some(Cow::Owned(value)));
136            } else {
137                // Value was explicitly deleted with a tombstone entry.
138                // Do not query C1 and C2.
139                return Ok(None);
140            }
141        }
142
143        // Check the C2 (sstable)
144        if let Some(c2) = &self.c2 {
145            let key = K::create_key(key);
146            if let Some(value) = c2.get(&key)? {
147                let value: Option<V> = self.serialization.deserialize(&value)?;
148                if let Some(value) = value {
149                    return Ok(Some(Cow::Owned(value)));
150                } else {
151                    // Value was explicitly deleted (and still written to disk)
152                    return Ok(None);
153                }
154            }
155        }
156
157        Ok(None)
158    }
159
160    pub fn contains_key(&self, key: &K) -> Result<bool> {
161        // Check C0 first
162        if let Some(value) = self.c0.get(key) {
163            if value.is_some() {
164                return Ok(true);
165            } else {
166                // Value was explicitly deleted, do not query the disk tables
167                return Ok(false);
168            }
169        }
170
171        // Check C1 (BTree disk index)
172        if let Some(c1) = &self.c1
173            && c1.contains_key(key)?
174        {
175            return Ok(true);
176        }
177
178        // Use a iterator on the single disk to check if there is an entry with this, without getting the value.
179        // Since we don't serialize tombstone entries when compacting or writing the disk table to an output file,
180        // when we are checking the key, we can safely assume the value is Some() and not None.
181        if let Some(c2) = &self.c2 {
182            let mut table_it = c2.iter();
183            let key = K::create_key(key);
184            table_it.seek(&key);
185            if let Some(it_key) = table_it.current_key()
186                && it_key == key.as_ref()
187            {
188                return Ok(true);
189            }
190        }
191        Ok(false)
192    }
193
194    pub fn remove(&mut self, key: &K) -> Result<Option<V>> {
195        let existing = self.get(key)?.map(|existing| existing.into_owned());
196        if existing.is_some() {
197            // Add tombstone entry
198            self.c0.insert(key.clone(), None);
199
200            self.evict_c0_if_necessary()?;
201        }
202
203        Ok(existing)
204    }
205
206    pub fn iter(&self) -> Result<ResultIterator<'_, K, V>> {
207        if let Some(c1) = &self.c1 {
208            if self.c0.is_empty() && self.c2.is_none() {
209                // Create an iterator that skips the thombstone entries
210                let it = c1
211                    .range(..)?
212                    .filter_map_ok(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
213                    .map(|entry| entry.map_err(|e| e.into()));
214
215                return Ok(Box::new(it));
216            }
217        } else if let Some(c2) = &self.c2 {
218            if self.c0.is_empty() && self.c1.as_ref().is_none_or(|c1| c1.is_empty()) {
219                let table_iterator = c2.iter();
220                let it = SingleTableIterator {
221                    table_iterator,
222                    serialization: self.serialization,
223                    phantom: PhantomData,
224                };
225                return Ok(Box::new(it));
226            }
227        } else {
228            // Create an iterator that skips the thombstone entries
229            let it = self
230                .c0
231                .iter()
232                .filter_map(|(k, v)| v.as_ref().map(|v| Ok((k.clone(), v.clone()))));
233            return Ok(Box::new(it));
234        }
235
236        // Use the flexible range iterator as default
237        Ok(Box::new(self.range(..)))
238    }
239
240    /// Returns an iterator over a range of entries.
241    pub fn range<'a, R>(&'a self, range: R) -> Box<dyn Iterator<Item = Result<(K, V)>> + 'a>
242    where
243        R: RangeBounds<K> + Clone,
244    {
245        // Check if C0, C1 or C2 are the only non-empty maps and return a specialized iterator
246        if let Some(c1) = &self.c1 {
247            if self.c0.is_empty() && self.c2.is_none() {
248                let c1_range = match c1.range(range).map_err(|e| e.into()) {
249                    Ok(c1_range) => c1_range,
250                    Err(e) => return Box::new(std::iter::once(Err(e))),
251                };
252                // Return iterator over C1 that skips the tombstone entries
253                let it = c1_range
254                    .filter_map_ok(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone())))
255                    .map(|entry| entry.map_err(|e| e.into()));
256                return Box::new(it);
257            }
258        } else if let Some(c2) = &self.c2 {
259            if self.c0.is_empty() && self.c1.as_ref().is_none_or(|c1| c1.is_empty()) {
260                let mapped_start_bound: std::ops::Bound<KeyVec> = match range.start_bound() {
261                    Bound::Included(end) => Bound::Included(K::create_key(end)),
262                    Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
263                    Bound::Unbounded => Bound::Unbounded,
264                };
265
266                let mapped_end_bound: std::ops::Bound<KeyVec> = match range.end_bound() {
267                    Bound::Included(end) => Bound::Included(K::create_key(end)),
268                    Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
269                    Bound::Unbounded => Bound::Unbounded,
270                };
271
272                // Return iterator over C2
273                return Box::new(SimplifiedRange::new(
274                    mapped_start_bound,
275                    mapped_end_bound,
276                    c2,
277                    self.serialization,
278                ));
279            }
280        } else {
281            // Neither C1 nor C2 exist:
282            // return range iterator over all C0 entries, but skip the tombestone entries
283            let it = self
284                .c0
285                .range(range)
286                .filter_map(|(k, v)| v.as_ref().map(|v| Ok((k.clone(), v.clone()))));
287            return Box::new(it);
288        }
289        // Use a combined iterator as default
290        match CombinedRange::new(
291            range,
292            &self.c0,
293            self.c1.as_ref(),
294            self.c2.as_ref(),
295            self.serialization,
296        ) {
297            Ok(result) => Box::new(result),
298            Err(e) => Box::new(std::iter::once(Err(e))),
299        }
300    }
301
302    pub fn is_empty(&self) -> Result<bool> {
303        if self.c0.is_empty() && self.c1.is_none() && self.c2.is_none() {
304            return Ok(true);
305        }
306        let mut it = self.iter()?;
307        Ok(it.next().is_none())
308    }
309
310    pub fn clear(&mut self) {
311        self.c0.clear();
312        self.c1 = None;
313        self.c2 = None;
314    }
315
316    pub fn write_to(&self, location: &Path) -> Result<()> {
317        // Make sure the parent directory exist
318        if let Some(parent) = location.parent() {
319            std::fs::create_dir_all(parent)?;
320        }
321        // Open file as writable
322        let out_file = std::fs::OpenOptions::new()
323            .write(true)
324            .read(true)
325            .create(true)
326            .truncate(true)
327            .open(location)?;
328        let mut builder = TableBuilder::new(custom_options(self.block_cache_capacity), out_file);
329        for entry in self.iter()? {
330            let (key, value) = entry?;
331            let key = key.create_key();
332            let value = Some(value);
333            builder.add(&key, &self.serialization.serialize(&value)?)?;
334        }
335        builder.finish()?;
336
337        Ok(())
338    }
339
340    /// Compact the existing disk tables and the in-memory table to a single temporary disk table.
341    pub fn compact(&mut self) -> Result<()> {
342        debug!("Evicting C0 and merging it with existing C1 to a temporary file");
343
344        if self.c1.is_none() {
345            let c1 = BtreeIndex::with_capacity(self.c1_btree_config.clone(), self.c0.len())?;
346            self.c1 = Some(c1);
347        }
348
349        if let Some(c1) = self.c1.as_mut() {
350            let mut c0 = BTreeMap::new();
351            std::mem::swap(&mut self.c0, &mut c0);
352            for (k, v) in c0.into_iter() {
353                c1.insert(k, v)?;
354            }
355        }
356
357        debug!("Finished evicting C0");
358        Ok(())
359    }
360
361    fn evict_c0_if_necessary(&mut self) -> Result<()> {
362        let evict_c0 = match self.eviction_strategy {
363            EvictionStrategy::MaximumItems(n) => self.c0.len() >= n,
364        };
365
366        if evict_c0 {
367            self.compact()?;
368        }
369
370        Ok(())
371    }
372}
373
374/// Implements an optimized iterator a single disk table.
375struct SingleTableIterator<K, V> {
376    table_iterator: TableIterator,
377    serialization: bincode::config::DefaultOptions,
378    phantom: std::marker::PhantomData<(K, V)>,
379}
380
381impl<K, V> Iterator for SingleTableIterator<K, V>
382where
383    for<'de> K: 'static + Clone + KeySerializer + Send,
384    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
385{
386    type Item = Result<(K, V)>;
387
388    fn next(&mut self) -> Option<Self::Item> {
389        while let Some((key, value)) = self.table_iterator.next() {
390            let key = match K::parse_key(&key) {
391                Ok(key) => key,
392                Err(e) => return Some(Err(e.into())),
393            };
394            let value: Option<V> = match self.serialization.deserialize(&value) {
395                Ok(value) => value,
396                Err(e) => return Some(Err(e.into())),
397            };
398            if let Some(value) = value {
399                return Some(Ok((key, value)));
400            }
401        }
402        None
403    }
404}
405
406impl<K, V> FusedIterator for SingleTableIterator<K, V>
407where
408    K: 'static + Clone + KeySerializer + Send,
409    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
410{
411}
412
413type ResultIterator<'a, K, V> = Box<dyn Iterator<Item = Result<(K, V)>> + 'a>;
414
415pub struct CombinedRange<'a, K, V>
416where
417    for<'de> K: 'static + Clone + KeySerializer + Send,
418    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
419{
420    c0_iterator: Peekable<std::collections::btree_map::Range<'a, K, Option<V>>>,
421    c1_iterator: Peekable<ResultIterator<'a, K, Option<V>>>,
422    c2_iterator: Peekable<ResultIterator<'a, K, V>>,
423}
424
425impl<'a, K, V> CombinedRange<'a, K, V>
426where
427    for<'de> K: 'static + Clone + KeySerializer + Serialize + Deserialize<'de> + Send + Sync + Ord,
428    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
429{
430    fn new<R: RangeBounds<K> + Clone>(
431        range: R,
432        c0: &'a BTreeMap<K, Option<V>>,
433        c1: Option<&'a BtreeIndex<K, Option<V>>>,
434        c2: Option<&Table>,
435        serialization: bincode::config::DefaultOptions,
436    ) -> Result<CombinedRange<'a, K, V>> {
437        let c1_iterator: Box<dyn Iterator<Item = Result<(K, Option<V>)>>> = if let Some(c1) = c1 {
438            let it = c1
439                .range(range.clone())?
440                .map(|entry| entry.map_err(|e| e.into()));
441            Box::new(it)
442        } else {
443            Box::new(std::iter::empty())
444        };
445
446        let c2: Box<dyn Iterator<Item = Result<(K, V)>>> = if let Some(c2) = c2 {
447            let table_start_bound = match range.start_bound() {
448                Bound::Included(end) => Bound::Included(K::create_key(end)),
449                Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
450                Bound::Unbounded => Bound::Unbounded,
451            };
452
453            let table_end_bound: std::ops::Bound<KeyVec> = match range.end_bound() {
454                Bound::Included(end) => Bound::Included(K::create_key(end)),
455                Bound::Excluded(end) => Bound::Excluded(K::create_key(end)),
456                Bound::Unbounded => Bound::Unbounded,
457            };
458
459            let it = SimplifiedRange::new(table_start_bound, table_end_bound, c2, serialization);
460            Box::new(it)
461        } else {
462            Box::new(std::iter::empty())
463        };
464
465        Ok(CombinedRange {
466            c0_iterator: c0.range(range).peekable(),
467            c1_iterator: c1_iterator.peekable(),
468            c2_iterator: c2.peekable(),
469        })
470    }
471}
472
473impl<K, V> Iterator for CombinedRange<'_, K, V>
474where
475    K: Ord,
476    for<'de> K: 'static + Clone + KeySerializer + Send,
477    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
478{
479    type Item = Result<(K, V)>;
480
481    fn next(&mut self) -> Option<Self::Item> {
482        while self.c0_iterator.peek().is_some()
483            || self.c1_iterator.peek().is_some()
484            || self.c2_iterator.peek().is_some()
485        {
486            // Get keys from all iterators and determine which is the smallest one
487            let c0 = self.c0_iterator.peek().map(|(k, _v)| Some(*k));
488            let c1 = self.c1_iterator.peek().map(|entry| match entry {
489                Ok((k, _v)) => Some(k),
490                Err(_) => None,
491            });
492            let c3 = self.c2_iterator.peek().map(|entry| match entry {
493                Ok((k, _v)) => Some(k),
494                Err(_) => None,
495            });
496
497            let min_key = vec![c0, c1, c3].into_iter().flatten().min();
498            if let Some(min_key) = min_key {
499                let c0_is_min = c0 == Some(min_key);
500                let c1_is_min = c1 == Some(min_key);
501                let c3_is_min = c3 == Some(min_key);
502
503                // Advance all iterators with the same (minimal) key
504                let c0 = if c0_is_min {
505                    self.c0_iterator.next()
506                } else {
507                    None
508                };
509                let c1 = if c1_is_min {
510                    self.c1_iterator.next()
511                } else {
512                    None
513                };
514                let c3 = if c3_is_min {
515                    self.c2_iterator.next()
516                } else {
517                    None
518                };
519
520                // Output the value from the most recent map
521                if let Some((k, v)) = c0 {
522                    if let Some(v) = v {
523                        return Some(Ok((k.clone(), v.clone())));
524                    } else {
525                        // Value was explicitly deleted, do not check the other maps
526                        continue;
527                    }
528                } else if let Some(entry) = c1 {
529                    match entry {
530                        Ok((k, v)) => {
531                            if let Some(v) = v {
532                                return Some(Ok((k, v)));
533                            } else {
534                                // Value was explicitly deleted, do not check the other maps
535                                continue;
536                            }
537                        }
538                        Err(e) => {
539                            return Some(Err(e));
540                        }
541                    };
542                } else if let Some(entry) = c3 {
543                    match entry {
544                        Ok((k, v)) => {
545                            return Some(Ok((k, v)));
546                        }
547                        Err(e) => {
548                            return Some(Err(e));
549                        }
550                    }
551                }
552            }
553        }
554        None
555    }
556}
557
558impl<K, V> FusedIterator for CombinedRange<'_, K, V>
559where
560    K: 'static + Ord + Clone + KeySerializer + Serialize + DeserializeOwned + Send,
561    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
562{
563}
564
565impl<K, V> Default for DiskMap<K, V>
566where
567    K: 'static + Ord + Clone + KeySerializer + Serialize + DeserializeOwned + Send + Sync,
568    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send + Sync,
569{
570    fn default() -> Self {
571        DiskMap::new_temporary(
572            EvictionStrategy::default(),
573            DEFAULT_BLOCK_CACHE_CAPACITY,
574            BtreeConfig::default(),
575        )
576    }
577}
578
579/// An iterator implementation for the case that there is only a single disk-table and no C0
580struct SimplifiedRange<K, V> {
581    range_start: Bound<KeyVec>,
582    range_end: Bound<KeyVec>,
583    table_it: TableIterator,
584    exhausted: bool,
585    serialization: bincode::config::DefaultOptions,
586
587    current_key: Vec<u8>,
588    current_value: Vec<u8>,
589
590    phantom: std::marker::PhantomData<(K, V)>,
591}
592
593impl<K, V> SimplifiedRange<K, V>
594where
595    for<'de> K: 'static + Clone + KeySerializer + Send,
596    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
597{
598    fn new(
599        range_start: Bound<KeyVec>,
600        range_end: Bound<KeyVec>,
601        disk_table: &Table,
602        serialization: bincode::config::DefaultOptions,
603    ) -> SimplifiedRange<K, V> {
604        let mut table_it = disk_table.iter();
605        let mut exhausted = false;
606
607        // Initialize the table iterators
608        match &range_start {
609            Bound::Included(start) => {
610                let start: &[u8] = start;
611                let mut key = Vec::default();
612                let mut value = Vec::default();
613
614                table_it.seek(start);
615
616                if table_it.valid() && table_it.current(&mut key, &mut value) {
617                    let key: &[u8] = &key;
618                    // Check if the seeked element is actually part of the range
619                    let start_included = match &range_start {
620                        Bound::Included(start) => {
621                            let start: &[u8] = start;
622                            key >= start
623                        }
624                        Bound::Excluded(start) => {
625                            let start: &[u8] = start;
626                            key > start
627                        }
628                        Bound::Unbounded => true,
629                    };
630                    let end_included = match &range_end {
631                        Bound::Included(end) => {
632                            let end: &[u8] = end;
633                            key <= end
634                        }
635                        Bound::Excluded(end) => {
636                            let end: &[u8] = end;
637                            key < end
638                        }
639                        Bound::Unbounded => true,
640                    };
641                    if !start_included || !end_included {
642                        exhausted = true;
643                    }
644                } else {
645                    // Seeked behind last element
646                    exhausted = true;
647                }
648            }
649            Bound::Excluded(start_bound) => {
650                let start_bound: &[u8] = start_bound;
651
652                let mut key: Vec<u8> = Vec::default();
653                let mut value = Vec::default();
654
655                table_it.seek(start_bound);
656                if table_it.valid() && table_it.current(&mut key, &mut value) {
657                    let key: &[u8] = &key;
658                    if key == start_bound {
659                        // We need to exclude the first match
660                        table_it.advance();
661                    }
662                }
663
664                // Check key after advance
665                if table_it.valid() && table_it.current(&mut key, &mut value) {
666                    let key: &[u8] = &key;
667
668                    // Check if the seeked element is actually part of the range
669                    let start_included = match &range_start {
670                        Bound::Included(start) => {
671                            let start: &[u8] = start;
672                            key >= start
673                        }
674                        Bound::Excluded(start) => {
675                            let start: &[u8] = start;
676                            key > start
677                        }
678                        Bound::Unbounded => true,
679                    };
680                    let end_included = match &range_end {
681                        Bound::Included(end) => {
682                            let end: &[u8] = end;
683                            key <= end
684                        }
685                        Bound::Excluded(end) => {
686                            let end: &[u8] = end;
687                            key < end
688                        }
689                        Bound::Unbounded => true,
690                    };
691                    if !start_included || !end_included {
692                        exhausted = true;
693                    }
694                } else {
695                    // Seeked behind last element
696                    exhausted = true;
697                }
698            }
699            Bound::Unbounded => {
700                table_it.seek_to_first();
701
702                if !table_it.valid() {
703                    exhausted = true;
704                }
705            }
706        };
707
708        SimplifiedRange {
709            range_start,
710            range_end,
711            exhausted,
712            table_it,
713            serialization,
714            current_key: Vec::new(),
715            current_value: Vec::new(),
716            phantom: std::marker::PhantomData,
717        }
718    }
719
720    fn range_contains(&self, item: &[u8]) -> bool {
721        (match &self.range_start {
722            Bound::Included(start) => start.as_slice() <= item,
723            Bound::Excluded(start) => start.as_slice() < item,
724            Bound::Unbounded => true,
725        }) && (match &self.range_end {
726            Bound::Included(end) => item <= end.as_ref(),
727            Bound::Excluded(end) => item < end.as_ref(),
728            Bound::Unbounded => true,
729        })
730    }
731}
732
733impl<K, V> Iterator for SimplifiedRange<K, V>
734where
735    for<'de> K: 'static + Clone + KeySerializer + Send,
736    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
737{
738    type Item = Result<(K, V)>;
739
740    fn next(&mut self) -> Option<Self::Item> {
741        while !self.exhausted && self.table_it.valid() {
742            if self
743                .table_it
744                .current(&mut self.current_key, &mut self.current_value)
745            {
746                if self.range_contains(&self.current_key) {
747                    let value: Option<V> = match self.serialization.deserialize(&self.current_value)
748                    {
749                        Ok(value) => value,
750                        Err(e) => return Some(Err(e.into())),
751                    };
752
753                    self.table_it.advance();
754
755                    if let Some(value) = value {
756                        let key = match K::parse_key(&self.current_key) {
757                            Ok(key) => key,
758                            Err(e) => return Some(Err(e.into())),
759                        };
760                        return Some(Ok((key, value)));
761                    }
762                } else {
763                    self.exhausted = true;
764                }
765            }
766        }
767        None
768    }
769}
770
771impl<K, V> FusedIterator for SimplifiedRange<K, V>
772where
773    K: 'static + Clone + KeySerializer + Send,
774    for<'de> V: 'static + Clone + Serialize + Deserialize<'de> + Send,
775{
776}
777
778#[cfg(test)]
779mod tests;