1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use crate::values::ValueLog;
use crate::sorted_table::TableIterator;
use crate::memtable::MemtableIterator;
use crate::KV_Trait;

use bincode::Options;

use std::future::Future;
use futures::stream::Stream;
use std::task::{Context, Poll};

use std::marker::PhantomData;
use std::sync::Arc;
use std::pin::Pin;

use crate::sync_iter::DbIterator as SyncIter;

type IterFuture<K, V> = dyn Future<Output = (DbIteratorInner<K,V>, Option<(K,V)>)>+Send;

pub struct DbIterator<K: KV_Trait, V: KV_Trait> {
    state: Option<Pin<Box<IterFuture<K,V>>>>
}

impl<K: KV_Trait, V: KV_Trait> DbIterator<K, V> {
    pub(crate) fn new(mem_iters: Vec<MemtableIterator>, table_iters: Vec<TableIterator>,
                      value_log: Arc<ValueLog>) -> Self {
        let inner = DbIteratorInner::new(mem_iters, table_iters, value_log);
        let state = Box::pin(DbIteratorInner::next(inner));

        Self{ state: Some(state) }
    }
}

impl<K: KV_Trait, V: KV_Trait> Stream for DbIterator<K, V> {
    type Item = (K, V);

    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
        let (inner, res) = if let Some(mut fut) = self.state.take() {
            match Future::poll(fut.as_mut(), ctx) {
                // return and keep waiting for result
                Poll::Pending => {
                    self.state = Some(fut);
                    return Poll::Pending
                }
                // item computation complete
                Poll::Ready((inner, res)) => (inner, res),
            }
        } else {
            // no items left
            return Poll::Ready(None)
        };

        // Prepare next state?
        if res.is_some() {
            self.state = Some(Box::pin(DbIteratorInner::next(inner)));
        } else {
            self.state = None;
        }

        // return item
        Poll::Ready(res)
    }
}

struct DbIteratorInner<K: KV_Trait, V: KV_Trait> {
    _marker: PhantomData<fn(K,V)>,

    last_key: Option<Vec<u8>>,
    mem_iters: Vec<MemtableIterator>,
    table_iters: Vec<TableIterator>,
    value_log: Arc<ValueLog>,
}

impl<K: KV_Trait, V: KV_Trait> DbIteratorInner<K, V> {
    fn new(mem_iters: Vec<MemtableIterator>, table_iters: Vec<TableIterator>
            , value_log: Arc<ValueLog>) -> Self {
        Self{
            mem_iters, table_iters, value_log,
            last_key: None, _marker: PhantomData
        }
    }

    async fn next(mut slf: Self) -> (Self, Option<(K, V)>) {
        let mut min_kv = None;
        let mut is_pending = true;

        for iter in slf.mem_iters.iter_mut() {
            SyncIter::<K,V>::parse_iter(&slf.last_key, iter, &mut min_kv);
        }

        for iter in slf.table_iters.iter_mut() {
            if SyncIter::<K,V>::parse_iter(&slf.last_key, iter, &mut min_kv) {
                is_pending = false;
            }
        }

        if let Some((key, entry)) = min_kv {
            let res_key = super::get_encoder().deserialize(&key).unwrap();
            let val_ref = &entry.value_ref;

            let res_val = if is_pending {
                slf.value_log.get_pending(*val_ref).await
            } else {
                slf.value_log.get(*val_ref).await
            };

            slf.last_key = Some(key.clone());

            (slf, Some((res_key, res_val)))
        } else {
            (slf, None)
        }
    }
}