use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::metadata::RowType;
use crate::row::binary::ValueWriter;
use crate::row::compacted::CompactedKeyWriter;
use crate::row::encode::KeyEncoder;
use crate::row::field_getter::FieldGetter;
use crate::row::{Datum, InternalRow};
use bytes::Bytes;
#[allow(dead_code)]
pub struct CompactedKeyEncoder {
field_getters: Vec<FieldGetter>,
field_encoders: Vec<ValueWriter>,
compacted_encoder: CompactedKeyWriter,
}
impl CompactedKeyEncoder {
pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result<CompactedKeyEncoder> {
let mut encode_col_indexes = Vec::with_capacity(keys.len());
for key in keys {
match row_type.get_field_index(key) {
Some(idx) => encode_col_indexes.push(idx),
None => {
return Err(IllegalArgument {
message: format!("Field {key:?} not found in input row type {row_type:?}"),
});
}
}
}
Self::new(row_type, encode_col_indexes)
}
pub fn new(row_type: &RowType, encode_field_pos: Vec<usize>) -> Result<CompactedKeyEncoder> {
let mut field_getters: Vec<FieldGetter> = Vec::with_capacity(encode_field_pos.len());
let mut field_encoders: Vec<ValueWriter> = Vec::with_capacity(encode_field_pos.len());
for pos in &encode_field_pos {
let data_type = row_type.fields().get(*pos).unwrap().data_type();
field_getters.push(FieldGetter::create(data_type, *pos));
field_encoders.push(CompactedKeyWriter::create_value_writer(data_type)?);
}
Ok(CompactedKeyEncoder {
field_encoders,
field_getters,
compacted_encoder: CompactedKeyWriter::new(),
})
}
}
#[allow(dead_code)]
impl KeyEncoder for CompactedKeyEncoder {
fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> {
self.compacted_encoder.reset();
for (pos, field_getter) in self.field_getters.iter().enumerate() {
match &field_getter.get_field(row)? {
Datum::Null => {
return Err(IllegalArgument {
message: format!("Cannot encode key with null value at position: {pos:?}"),
});
}
value => self.field_encoders.get(pos).unwrap().write_value(
&mut self.compacted_encoder,
pos,
value,
)?,
}
}
Ok(self.compacted_encoder.to_bytes())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::DataTypes;
use crate::row::{Datum, GenericRow};
pub fn for_test_row_type(row_type: &RowType) -> CompactedKeyEncoder {
CompactedKeyEncoder::new(row_type, (0..row_type.fields().len()).collect())
.expect("CompactedKeyEncoder initialization failed")
}
#[test]
fn test_encode_key() {
let row_type = RowType::with_data_types(vec![
DataTypes::int(),
DataTypes::bigint(),
DataTypes::int(),
]);
let row = GenericRow::from_data(vec![
Datum::from(1i32),
Datum::from(3i64),
Datum::from(2i32),
]);
let mut encoder = for_test_row_type(&row_type);
assert_eq!(
encoder.encode_key(&row).unwrap().iter().as_slice(),
[1u8, 3u8, 2u8]
);
let row = GenericRow::from_data(vec![
Datum::from(2i32),
Datum::from(5i64),
Datum::from(6i32),
]);
assert_eq!(
encoder.encode_key(&row).unwrap().iter().as_slice(),
[2u8, 5u8, 6u8]
);
}
#[test]
fn test_encode_key_with_key_names() {
let data_types = vec![
DataTypes::string(),
DataTypes::bigint(),
DataTypes::string(),
];
let field_names = vec!["partition", "f1", "f2"];
let row_type = RowType::with_data_types_and_field_names(data_types, field_names);
let primary_keys = &["f2".to_string()];
let mut encoder = CompactedKeyEncoder::create_key_encoder(&row_type, primary_keys).unwrap();
let row = GenericRow::from_data(vec![
Datum::from("p1"),
Datum::from(1i64),
Datum::from("a2"),
]);
assert_eq!(
encoder.encode_key(&row).unwrap().iter().as_slice(),
[2u8, 97u8, 50u8]
);
}
#[test]
#[should_panic(expected = "Cannot encode key with null value at position: 2")]
fn test_null_primary_key() {
let row_type = RowType::with_data_types(vec![
DataTypes::int(),
DataTypes::bigint(),
DataTypes::int(),
DataTypes::string(),
]);
let primary_key_indices = vec![0, 1, 2];
let mut encoder = CompactedKeyEncoder::new(&row_type, primary_key_indices)
.expect("CompactedKeyEncoder initialization failed");
let row = GenericRow::from_data(vec![
Datum::from(1i32),
Datum::from(3i64),
Datum::from(2i32),
Datum::from("a2"),
]);
assert_eq!(
encoder.encode_key(&row).unwrap().iter().as_slice(),
[1u8, 3u8, 2u8]
);
let row = GenericRow::from_data(vec![
Datum::from(1i32),
Datum::from(3i64),
Datum::Null,
Datum::from("a2"),
]);
encoder.encode_key(&row).unwrap();
}
#[test]
fn test_int_string_as_primary_key() {
let row_type = RowType::with_data_types(vec![
DataTypes::string(),
DataTypes::int(),
DataTypes::string(),
DataTypes::string(),
]);
let primary_key_indices = vec![1, 2];
let mut encoder = CompactedKeyEncoder::new(&row_type, primary_key_indices)
.expect("CompactedKeyEncoder initialization failed");
let row = GenericRow::from_data(vec![
Datum::from("a1"),
Datum::from(1i32),
Datum::from("a2"),
Datum::from("a3"),
]);
assert_eq!(
encoder.encode_key(&row).unwrap().iter().as_slice(),
[1u8, 2u8, 97u8, 50u8]
);
}
#[test]
fn test_all_data_types_java_compatible() {
use crate::metadata::{DataType, TimestampLTzType, TimestampType};
let row_type = RowType::with_data_types(vec![
DataTypes::boolean(), DataTypes::tinyint(), DataTypes::smallint(), DataTypes::int(), DataTypes::bigint(), DataTypes::float(), DataTypes::double(), DataTypes::date(), DataTypes::time(), DataTypes::binary(20), DataTypes::bytes(), DataTypes::char(2), DataTypes::string(), DataTypes::decimal(5, 2), DataTypes::decimal(20, 0), DataType::Timestamp(TimestampType::with_nullable(false, 1).unwrap()), DataType::Timestamp(TimestampType::with_nullable(false, 5).unwrap()), DataType::TimestampLTz(TimestampLTzType::with_nullable(false, 1).unwrap()), DataType::TimestampLTz(TimestampLTzType::with_nullable(false, 5).unwrap()), ]);
let row = GenericRow::from_data(vec![
Datum::from(true), Datum::from(2i8), Datum::from(10i16), Datum::from(100i32), Datum::from(-6101065172474983726i64), Datum::from(13.2f32), Datum::from(15.21f64), Datum::Date(crate::row::datum::Date::new(19655)), Datum::Time(crate::row::datum::Time::new(34200000)), Datum::from("1234567890".as_bytes()), Datum::from("20".as_bytes()), Datum::from("1"), Datum::from("hello"), Datum::Decimal(crate::row::Decimal::from_unscaled_long(9, 5, 2).unwrap()), Datum::Decimal(
crate::row::Decimal::from_big_decimal(
bigdecimal::BigDecimal::new(bigdecimal::num_bigint::BigInt::from(10), 0),
20,
0,
)
.unwrap(),
), Datum::TimestampNtz(crate::row::datum::TimestampNtz::new(1698235273182)), Datum::TimestampNtz(crate::row::datum::TimestampNtz::new(1698235273182)), Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(1698235273182)), Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(1698235273182)), ]);
#[rustfmt::skip]
let expected: Vec<u8> = vec![
0x01,
0x02,
0x0A,
0x00, 0x64,
0xD2, 0x95, 0xFC, 0xD8, 0xCE, 0xB1, 0xAA, 0xAA, 0xAB, 0x01,
0x33, 0x33, 0x53, 0x41,
0xEC, 0x51, 0xB8, 0x1E, 0x85, 0x6B, 0x2E, 0x40,
0xC7, 0x99, 0x01,
0xC0, 0xB3, 0xA7, 0x10,
0x0A, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30,
0x02, 0x32, 0x30,
0x01, 0x31,
0x05, 0x68, 0x65, 0x6C, 0x6C, 0x6F,
0x09,
0x01, 0x0A,
0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31,
0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00,
0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31,
0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00,
];
let mut encoder = for_test_row_type(&row_type);
let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(
encoded.iter().as_slice(),
expected.as_slice(),
"\n\nRust encoding does not match Java reference from encoded_key.hex\n\
Expected: {:02X?}\n\
Actual: {:02X?}\n",
expected,
encoded.iter().as_slice()
);
}
}