1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use crate::api::checkpoint::CheckpointFunction;
use crate::api::element::Record;
use crate::api::function::{Context, KeySelectorFunction, NamedFunction};
use crate::functions::schema_base::FunctionSchema;

#[derive(Debug)]
pub struct SchemaBaseKeySelector {
    field_types: Vec<u8>,
    key_field_types: Vec<u8>,
    columns: Vec<usize>,
}

impl SchemaBaseKeySelector {
    pub fn new(columns: Vec<usize>, data_types: &[u8]) -> Self {
        let key_field_types: Vec<u8> = columns.iter().map(|index| data_types[*index]).collect();
        SchemaBaseKeySelector {
            columns,
            field_types: data_types.to_vec(),
            key_field_types,
        }
    }
}

impl FunctionSchema for SchemaBaseKeySelector {
    fn schema_types(&self) -> Vec<u8> {
        self.key_field_types.clone()
    }
}

impl KeySelectorFunction for SchemaBaseKeySelector {
    fn open(&mut self, _context: &Context) -> crate::api::Result<()> {
        Ok(())
    }

    fn get_key(&self, record: &mut Record) -> Record {
        let mut record_key = Record::with_capacity(record.len());
        let mut writer = record_key.as_writer(self.key_field_types.as_slice());

        let mut reader = record.as_reader(self.field_types.as_slice());

        for index in 0..self.columns.len() {
            writer
                .set_bytes_raw(reader.get_bytes_raw(self.columns[index]).unwrap())
                .unwrap();
        }

        record_key
    }

    fn close(&mut self) -> crate::api::Result<()> {
        Ok(())
    }
}

impl NamedFunction for SchemaBaseKeySelector {
    fn name(&self) -> &str {
        "SchemaBaseKeySelector"
    }
}

impl CheckpointFunction for SchemaBaseKeySelector {}