#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SortDirection {
Asc,
Desc,
}
#[derive(Debug, Clone)]
pub struct SortColumn {
pub name: String,
pub direction: SortDirection,
}
#[derive(Debug, Clone)]
pub struct SortKeyEncoder {
columns: Vec<SortColumn>,
}
impl SortKeyEncoder {
pub fn new(columns: Vec<SortColumn>) -> Self {
Self { columns }
}
pub fn column_count(&self) -> usize {
self.columns.len()
}
pub fn columns(&self) -> &[SortColumn] {
&self.columns
}
pub fn encode(&self, values: &[&[u8]]) -> Vec<u8> {
debug_assert_eq!(values.len(), self.columns.len());
let total_len: usize = values.iter().map(|v| 4 + v.len()).sum();
let mut key = Vec::with_capacity(total_len);
for (value, col) in values.iter().zip(&self.columns) {
let len = value.len() as u32;
key.extend_from_slice(&len.to_be_bytes());
match col.direction {
SortDirection::Asc => {
key.extend_from_slice(value);
}
SortDirection::Desc => {
for &b in *value {
key.push(!b);
}
}
}
}
key
}
pub fn encode_i64(value: i64) -> [u8; 8] {
let unsigned = (value as u64) ^ (1u64 << 63);
unsigned.to_be_bytes()
}
pub fn decode_i64(bytes: &[u8; 8]) -> i64 {
let unsigned = u64::from_be_bytes(*bytes);
(unsigned ^ (1u64 << 63)) as i64
}
pub fn encode_f64(value: f64) -> [u8; 8] {
let bits = value.to_bits();
let encoded = if bits & (1u64 << 63) == 0 {
bits ^ (1u64 << 63)
} else {
!bits
};
encoded.to_be_bytes()
}
pub fn decode_f64(bytes: &[u8; 8]) -> f64 {
let encoded = u64::from_be_bytes(*bytes);
let bits = if encoded & (1u64 << 63) != 0 {
encoded ^ (1u64 << 63)
} else {
!encoded
};
f64::from_bits(bits)
}
pub fn encode_timestamp_ms(ts_ms: u64) -> [u8; 8] {
ts_ms.to_be_bytes()
}
pub fn decode_timestamp_ms(bytes: &[u8; 8]) -> u64 {
u64::from_be_bytes(*bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn i64_roundtrip() {
for v in [i64::MIN, -1, 0, 1, i64::MAX] {
let encoded = SortKeyEncoder::encode_i64(v);
assert_eq!(SortKeyEncoder::decode_i64(&encoded), v);
}
}
#[test]
fn i64_ordering() {
let values = [i64::MIN, -1000, -1, 0, 1, 1000, i64::MAX];
let encoded: Vec<_> = values
.iter()
.map(|v| SortKeyEncoder::encode_i64(*v))
.collect();
for i in 0..encoded.len() - 1 {
assert!(
encoded[i] < encoded[i + 1],
"encode_i64({}) should be < encode_i64({})",
values[i],
values[i + 1]
);
}
}
#[test]
fn f64_roundtrip() {
for v in [f64::NEG_INFINITY, -1.0, -0.0, 0.0, 1.0, f64::INFINITY] {
let encoded = SortKeyEncoder::encode_f64(v);
let decoded = SortKeyEncoder::decode_f64(&encoded);
assert_eq!(v.to_bits(), decoded.to_bits(), "roundtrip failed for {v}");
}
}
#[test]
fn f64_ordering() {
let values = [
f64::NEG_INFINITY,
-100.0,
-0.001,
0.0,
0.001,
100.0,
f64::INFINITY,
];
let encoded: Vec<_> = values
.iter()
.map(|v| SortKeyEncoder::encode_f64(*v))
.collect();
for i in 0..encoded.len() - 1 {
assert!(
encoded[i] < encoded[i + 1],
"encode_f64({}) should be < encode_f64({})",
values[i],
values[i + 1]
);
}
}
#[test]
fn composite_key_desc_score_asc_time() {
let encoder = SortKeyEncoder::new(vec![
SortColumn {
name: "score".into(),
direction: SortDirection::Desc,
},
SortColumn {
name: "updated_at".into(),
direction: SortDirection::Asc,
},
]);
let score_100 = SortKeyEncoder::encode_i64(100);
let score_200 = SortKeyEncoder::encode_i64(200);
let time_early = SortKeyEncoder::encode_timestamp_ms(1000);
let time_late = SortKeyEncoder::encode_timestamp_ms(2000);
let key_200_early = encoder.encode(&[&score_200, &time_early]);
let key_100_early = encoder.encode(&[&score_100, &time_early]);
let key_200_late = encoder.encode(&[&score_200, &time_late]);
assert!(key_200_early < key_100_early);
assert!(key_200_early < key_200_late);
}
#[test]
fn composite_key_asc_score() {
let encoder = SortKeyEncoder::new(vec![SortColumn {
name: "score".into(),
direction: SortDirection::Asc,
}]);
let score_50 = SortKeyEncoder::encode_i64(50);
let score_150 = SortKeyEncoder::encode_i64(150);
let key_50 = encoder.encode(&[&score_50]);
let key_150 = encoder.encode(&[&score_150]);
assert!(key_50 < key_150);
}
#[test]
fn timestamp_roundtrip_and_ordering() {
let ts1 = 1714600000000u64;
let ts2 = 1714600001000u64;
let e1 = SortKeyEncoder::encode_timestamp_ms(ts1);
let e2 = SortKeyEncoder::encode_timestamp_ms(ts2);
assert_eq!(SortKeyEncoder::decode_timestamp_ms(&e1), ts1);
assert_eq!(SortKeyEncoder::decode_timestamp_ms(&e2), ts2);
assert!(e1 < e2);
}
}