kip-sql 0.0.1-alpha.8

build the SQL layer of KipDB database
Documentation
use crate::execution::volcano::{BoxedExecutor, Executor};
use crate::execution::ExecutorError;
use crate::planner::operator::limit::LimitOperator;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use futures::StreamExt;
use futures_async_stream::try_stream;
use std::cell::RefCell;

pub struct Limit {
    offset: Option<usize>,
    limit: Option<usize>,
    input: BoxedExecutor,
}

impl From<(LimitOperator, BoxedExecutor)> for Limit {
    fn from((LimitOperator { offset, limit }, input): (LimitOperator, BoxedExecutor)) -> Self {
        Limit {
            offset,
            limit,
            input,
        }
    }
}

impl<T: Transaction> Executor<T> for Limit {
    fn execute(self, _transaction: &RefCell<T>) -> BoxedExecutor {
        self._execute()
    }
}

impl Limit {
    #[try_stream(boxed, ok = Tuple, error = ExecutorError)]
    pub async fn _execute(self) {
        let Limit {
            offset,
            limit,
            input,
        } = self;

        if limit.is_some() && limit.unwrap() == 0 {
            return Ok(());
        }

        let offset_val = offset.unwrap_or(0);
        let offset_limit = offset_val + limit.unwrap_or(1) - 1;

        #[for_await]
        for (i, tuple) in input.enumerate() {
            if i < offset_val {
                continue;
            } else if i > offset_limit {
                break;
            }

            yield tuple?;
        }
    }
}