1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
use crate::bitvec::*;
use crate::engine::*;


#[derive(Debug)]
pub struct MergeKeep<T> {
    pub merge_ops: BufferRef<u8>,
    pub left: BufferRef<T>,
    pub right: BufferRef<T>,
    pub merged: BufferRef<T>,
}

impl<'a, T: VecData<T> + 'a> VecOperator<'a> for MergeKeep<T> {
    fn execute(&mut self, _: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError>{
        let merged = {
            let ops = scratchpad.get(self.merge_ops);
            let left = scratchpad.get(self.left);
            let right = scratchpad.get(self.right);
            merge_keep(&ops, &left, &right)
        };
        scratchpad.set(self.merged, merged);
        Ok(())
    }

    fn inputs(&self) -> Vec<BufferRef<Any>> { vec![self.merge_ops.any(), self.left.any(), self.right.any()] }
    fn outputs(&self) -> Vec<BufferRef<Any>> { vec![self.merged.any()] }
    fn can_stream_input(&self, _: usize) -> bool { false }
    fn can_stream_output(&self, _: usize) -> bool { false }
    fn allocates(&self) -> bool { true }

    fn display_op(&self, _: bool) -> String {
        format!("merge_keep({}, {}, {})", self.merge_ops, self.left, self.right)
    }
}

fn merge_keep<'a, T: Copy + 'a>(ops: &[u8], left: &[T], right: &[T]) -> Vec<T> {
    let mut result = Vec::with_capacity(ops.len());
    let mut i = 0;
    let mut j = 0;
    for take_left in ops {
        if *take_left == 1 {
            result.push(left[i]);
            i += 1;
        } else {
            result.push(right[j]);
            j += 1;
        }
    }
    result
}

#[derive(Debug)]
pub struct MergeKeepNullable<T> {
    pub merge_ops: BufferRef<u8>,
    pub left: BufferRef<Nullable<T>>,
    pub right: BufferRef<Nullable<T>>,
    pub merged: BufferRef<Nullable<T>>,
}

impl<'a, T: VecData<T> + 'a> VecOperator<'a> for MergeKeepNullable<T> {
    fn execute(&mut self, _: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError>{
        let (merged, merged_present) = {
            let ops = scratchpad.get(self.merge_ops);
            let (left, left_present) = scratchpad.get_nullable(self.left);
            let (right, right_present) = scratchpad.get_nullable(self.right);
            merge_keep_nullable(&ops, &left, &right, &left_present, &right_present)
        };
        scratchpad.set_nullable(self.merged, merged, merged_present);
        Ok(())
    }

    fn inputs(&self) -> Vec<BufferRef<Any>> { vec![self.merge_ops.any(), self.left.any(), self.right.any()] }
    fn outputs(&self) -> Vec<BufferRef<Any>> { vec![self.merged.any()] }
    fn can_stream_input(&self, _: usize) -> bool { false }
    fn can_stream_output(&self, _: usize) -> bool { false }
    fn allocates(&self) -> bool { true }

    fn display_op(&self, _: bool) -> String {
        format!("merge_keep_nullable({}, {}, {})", self.merge_ops, self.left, self.right)
    }
}

fn merge_keep_nullable<'a, T: Copy + 'a>(ops: &[u8],
                                         left: &[T],
                                         right: &[T],
                                         left_present: &[u8],
                                         right_present: &[u8]) -> (Vec<T>, Vec<u8>) {
    let mut result = Vec::with_capacity(ops.len());
    let mut present = Vec::with_capacity(ops.len() / 8);
    let mut i = 0;
    let mut j = 0;
    for take_left in ops {
        if *take_left == 1 {
            result.push(left[i]);
            if left_present.is_set(i) { present.set(i + j); }
            i += 1;
        } else {
            result.push(right[j]);
            if right_present.is_set(j) { present.set(i + j); }
            j += 1;
        }
    }
    (result, present)
}