kip-sql 0.0.1-alpha.8

build the SQL layer of KipDB database
Documentation
use crate::execution::codegen::CodeGenerator;
use crate::execution::volcano::dql::join::hash_join::HashJoinStatus;
use crate::execution::ExecutorError;
use crate::impl_from_lua;
use crate::planner::operator::join::JoinOperator;
use crate::types::tuple::Tuple;
use mlua::prelude::{LuaResult, LuaValue};
use mlua::{FromLua, Lua, UserData, UserDataMethods, Value};
use std::mem;

pub struct HashJoin {
    pub(crate) id: i64,
    join_status: Option<HashJoinStatus>,
    is_produced: bool,

    env: String,
}

impl From<(JoinOperator, i64, String)> for HashJoin {
    fn from((JoinOperator { on, join_type }, id, env): (JoinOperator, i64, String)) -> Self {
        HashJoin {
            id,
            join_status: Some(HashJoinStatus::new(on, join_type)),
            is_produced: false,
            env,
        }
    }
}

impl UserData for HashJoinStatus {
    fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
        methods.add_method_mut("left_build", |_, join_status, tuple: Tuple| {
            join_status.left_build(tuple).unwrap();

            Ok(())
        });
        methods.add_method_mut("right_probe", |_, join_status, tuple: Tuple| {
            Ok(join_status.right_probe(tuple).unwrap())
        });
        methods.add_method_mut("drop_build", |_, join_status, ()| {
            Ok(join_status.build_drop())
        });
    }
}

impl_from_lua!(HashJoinStatus);

impl CodeGenerator for HashJoin {
    fn produce(&mut self, lua: &Lua, script: &mut String) -> Result<(), ExecutorError> {
        if let Some(join_status) = self.join_status.take() {
            lua.globals().set(self.env.as_str(), join_status)?;

            script.push_str(
                format!(
                    r#"
            for _, tuple in ipairs({}:drop_build()) do
                table.insert(join_temp_{}, tuple)
            end

            for index, tuple in ipairs(join_temp_{}) do
                index = index - 1
            "#,
                    self.env, self.id, self.id
                )
                .as_str(),
            );

            self.is_produced = true;
        }

        Ok(())
    }

    fn consume(&mut self, _: &Lua, script: &mut String) -> Result<(), ExecutorError> {
        if mem::replace(&mut self.is_produced, false) {
            script.push_str(
                r#"
                table.insert(results, tuple)
                ::continue::
            end
            "#,
            );
        }

        Ok(())
    }
}