locustdb 0.3.4

Embeddable high-performance analytics database.
Documentation
use crate::bitvec::*;
use crate::engine::*;


#[derive(Debug)]
pub struct MergeKeep<T> {
    pub merge_ops: BufferRef<u8>,
    pub left: BufferRef<T>,
    pub right: BufferRef<T>,
    pub merged: BufferRef<T>,
}

impl<'a, T: VecData<T> + 'a> VecOperator<'a> for MergeKeep<T> {
    fn execute(&mut self, _: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError>{
        let merged = {
            let ops = scratchpad.get(self.merge_ops);
            let left = scratchpad.get(self.left);
            let right = scratchpad.get(self.right);
            merge_keep(&ops, &left, &right)
        };
        scratchpad.set(self.merged, merged);
        Ok(())
    }

    fn inputs(&self) -> Vec<BufferRef<Any>> { vec![self.merge_ops.any(), self.left.any(), self.right.any()] }
    fn outputs(&self) -> Vec<BufferRef<Any>> { vec![self.merged.any()] }
    fn can_stream_input(&self, _: usize) -> bool { false }
    fn can_stream_output(&self, _: usize) -> bool { false }
    fn allocates(&self) -> bool { true }

    fn display_op(&self, _: bool) -> String {
        format!("merge_keep({}, {}, {})", self.merge_ops, self.left, self.right)
    }
}

fn merge_keep<'a, T: Copy + 'a>(ops: &[u8], left: &[T], right: &[T]) -> Vec<T> {
    let mut result = Vec::with_capacity(ops.len());
    let mut i = 0;
    let mut j = 0;
    for take_left in ops {
        if *take_left == 1 {
            result.push(left[i]);
            i += 1;
        } else {
            result.push(right[j]);
            j += 1;
        }
    }
    result
}

#[derive(Debug)]
pub struct MergeKeepNullable<T> {
    pub merge_ops: BufferRef<u8>,
    pub left: BufferRef<Nullable<T>>,
    pub right: BufferRef<Nullable<T>>,
    pub merged: BufferRef<Nullable<T>>,
}

impl<'a, T: VecData<T> + 'a> VecOperator<'a> for MergeKeepNullable<T> {
    fn execute(&mut self, _: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError>{
        let (merged, merged_present) = {
            let ops = scratchpad.get(self.merge_ops);
            let (left, left_present) = scratchpad.get_nullable(self.left);
            let (right, right_present) = scratchpad.get_nullable(self.right);
            merge_keep_nullable(&ops, &left, &right, &left_present, &right_present)
        };
        scratchpad.set_nullable(self.merged, merged, merged_present);
        Ok(())
    }

    fn inputs(&self) -> Vec<BufferRef<Any>> { vec![self.merge_ops.any(), self.left.any(), self.right.any()] }
    fn outputs(&self) -> Vec<BufferRef<Any>> { vec![self.merged.any()] }
    fn can_stream_input(&self, _: usize) -> bool { false }
    fn can_stream_output(&self, _: usize) -> bool { false }
    fn allocates(&self) -> bool { true }

    fn display_op(&self, _: bool) -> String {
        format!("merge_keep_nullable({}, {}, {})", self.merge_ops, self.left, self.right)
    }
}

fn merge_keep_nullable<'a, T: Copy + 'a>(ops: &[u8],
                                         left: &[T],
                                         right: &[T],
                                         left_present: &[u8],
                                         right_present: &[u8]) -> (Vec<T>, Vec<u8>) {
    let mut result = Vec::with_capacity(ops.len());
    let mut present = Vec::with_capacity(ops.len() / 8);
    let mut i = 0;
    let mut j = 0;
    for take_left in ops {
        if *take_left == 1 {
            result.push(left[i]);
            if left_present.is_set(i) { present.set(i + j); }
            i += 1;
        } else {
            result.push(right[j]);
            if right_present.is_set(j) { present.set(i + j); }
            j += 1;
        }
    }
    (result, present)
}