use std::{
cmp::min,
fmt::{self, Debug, Display, Formatter},
mem::size_of,
};
use feldera_macros::IsNone;
use num::PrimInt;
use rkyv::{Archive, Deserialize, Serialize};
use size_of::SizeOf;
use crate::{DBData, dynamic::DynDataTyped, operator::dynamic::time_series::Range};
use super::{RADIX, RADIX_BITS};
#[derive(
Clone,
Debug,
Default,
SizeOf,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
Archive,
Serialize,
Deserialize,
IsNone,
)]
#[archive_attr(derive(Eq, PartialEq, PartialOrd, Ord))]
#[archive(compare(PartialEq, PartialOrd))]
pub struct Prefix<TS: DBData> {
pub key: TS,
pub prefix_len: u32,
}
impl<TS> Display for Prefix<TS>
where
TS: DBData + PrimInt,
{
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), fmt::Error> {
if self.prefix_len == Self::KEY_BITS {
write!(f, "{:x?}", self.key.to_u128().unwrap())
} else if self.prefix_len == 0 {
write!(f, "*")
} else {
write!(
f,
"{:x?}/{}",
(self.key >> (Self::KEY_BITS - self.prefix_len) as usize)
.to_u128()
.unwrap(),
self.prefix_len
)
}
}
}
impl<TS> Prefix<TS>
where
TS: DBData + PrimInt,
{
const KEY_BITS: u32 = (size_of::<TS>() * 8) as u32;
#[cfg(test)]
fn new(key: TS, prefix_len: u32) -> Self {
Self { key, prefix_len }
}
pub fn from_timestamp(key: TS) -> Self {
Self {
key,
prefix_len: Self::KEY_BITS,
}
}
pub fn is_leaf(&self) -> bool {
self.prefix_len == Self::KEY_BITS
}
pub fn full_range() -> Self {
Self {
key: TS::zero(),
prefix_len: 0,
}
}
fn upper(&self) -> TS {
self.key | !Self::prefix_mask(self.prefix_len)
}
fn wildcard_bits(prefix_len: u32) -> usize {
(Self::KEY_BITS - prefix_len) as usize
}
fn prefix_mask(prefix_len: u32) -> TS {
if prefix_len == 0 {
TS::zero()
} else {
(TS::max_value() >> Self::wildcard_bits(prefix_len)) << Self::wildcard_bits(prefix_len)
}
}
pub fn longest_common_prefix(&self, key: TS) -> Self {
let longest_common_len = min((key ^ self.key).leading_zeros(), self.prefix_len);
#[allow(clippy::modulo_one)]
let prefix_len = longest_common_len - longest_common_len % RADIX_BITS;
Self {
key: key & Self::prefix_mask(prefix_len),
prefix_len,
}
}
pub fn contains(&self, key: TS) -> bool {
(self.key & Self::prefix_mask(self.prefix_len))
== (key & Self::prefix_mask(self.prefix_len))
}
pub fn slot_of_timestamp(&self, key: TS) -> usize {
debug_assert!(self.prefix_len < Self::KEY_BITS);
debug_assert!(self.contains(key));
((key >> Self::wildcard_bits(self.prefix_len + RADIX_BITS)) & TS::from(RADIX - 1).unwrap())
.to_usize()
.unwrap()
}
pub(crate) fn slot_of(&self, other: &Self) -> usize {
debug_assert!(self.prefix_len < other.prefix_len);
self.slot_of_timestamp(other.key)
}
pub fn extend(&self, slot: usize) -> Self {
debug_assert!(self.prefix_len < Self::KEY_BITS);
debug_assert!(slot < RADIX);
let prefix_len = self.prefix_len + RADIX_BITS;
Self {
key: self.key | (TS::from(slot).unwrap() << Self::wildcard_bits(prefix_len)),
prefix_len,
}
}
pub fn in_range(&self, range: &Range<TS>) -> bool {
range.from <= self.key && range.to >= self.upper()
}
}
pub type DynPrefix<TS> = DynDataTyped<Prefix<TS>>;
#[cfg(test)]
mod test {
use rkyv::{Deserialize, Infallible, archived_root, to_bytes};
use crate::operator::dynamic::time_series::radix_tree::{Prefix, RADIX_BITS};
#[test]
fn test_prefix() {
type TestPrefix = Prefix<u64>;
assert_eq!(TestPrefix::from_timestamp(0), Prefix::new(0, 64));
assert_eq!(
TestPrefix::from_timestamp(u64::MAX),
Prefix::new(u64::MAX, 64)
);
assert_eq!(TestPrefix::full_range(), Prefix::new(0, 0));
assert!(!TestPrefix::full_range().is_leaf());
assert!(TestPrefix::from_timestamp(100).is_leaf());
assert!(!Prefix::new(100, RADIX_BITS * 2).is_leaf());
assert_eq!(TestPrefix::prefix_mask(0), 0);
assert_eq!(TestPrefix::prefix_mask(4), 0xf000_0000_0000_0000);
assert_eq!(TestPrefix::prefix_mask(32), 0xffff_ffff_0000_0000);
assert_eq!(TestPrefix::prefix_mask(64), 0xffff_ffff_ffff_ffff);
assert_eq!(
Prefix::new(0xff00_0000_0000_0000u64, 8).longest_common_prefix(0xffff_ffff_ffff_ffff),
Prefix::new(0xff00_0000_0000_0000u64, 8)
);
assert_eq!(
Prefix::new(0xff00_0000_0000_0000u64, 8).longest_common_prefix(0xf0ff_ffff_ffff_ffff),
Prefix::new(0xf000_0000_0000_0000u64, 4)
);
assert_eq!(
Prefix::new(0xff00_0000_0000_0000u64, 8).longest_common_prefix(0xfcff_ffff_ffff_ffff),
Prefix::new(0xfc00_0000_0000_0000u64, 6)
);
assert_eq!(
Prefix::new(0xff00_0000_0000_0000u64, 8).longest_common_prefix(0x00ff_ffff_ffff_ffff),
Prefix::new(0x0000_0000_0000_0000u64, 0)
);
assert_eq!(
Prefix::new(0xffff_0000_ffff_0000u64, 64).longest_common_prefix(0xffff_0000_ffff_0000),
Prefix::new(0xffff_0000_ffff_0000u64, 64)
);
assert_eq!(
Prefix::new(0x0000_0000_0000_0000u64, 0).upper(),
0xffff_ffff_ffff_ffffu64
);
assert_eq!(
Prefix::new(0xff00_0000_0000_0000u64, 8).upper(),
0xffff_ffff_ffff_ffffu64
);
assert_eq!(
Prefix::new(0x1234_5678_0000_0000u64, 32).upper(),
0x1234_5678_ffff_ffffu64
);
assert_eq!(
Prefix::new(0x1234_5678_0000_1111u64, 64).upper(),
0x1234_5678_0000_1111u64
);
assert!(Prefix::new(0xffff_0000_ffff_0000u64, 64).contains(0xffff_0000_ffff_0000));
assert!(!Prefix::new(0xffff_0000_ffff_0000u64, 64).contains(0xffff_0000_ffff_00ff));
assert!(Prefix::new(0xffff_ffff_0000_0000u64, 32).contains(0xffff_ffff_ffff_ffff));
assert!(!Prefix::new(0xffff_ffff_0000_0000u64, 32).contains(0xffff_0000_ffff_ffff));
assert!(Prefix::new(0x0000_0000_0000_0000u64, 0).contains(0xffff_ffff_ffff_ffff));
assert!(Prefix::new(0x0000_0000_0000_0000u64, 0).contains(0x0000_0000_0000_0000));
assert_eq!(
Prefix::new(0x0000_0000_0000_0000u64, 0).slot_of_timestamp(0x0000_0000_0000_0001),
0
);
assert_eq!(
Prefix::new(0x0000_0000_0000_0000u64, 0).slot_of_timestamp(0x1000_0000_0000_0001),
0
);
assert_eq!(
Prefix::new(0x0000_0000_0000_0000u64, 0).slot_of_timestamp(0xa000_0000_0000_0001),
1
);
assert_eq!(
Prefix::new(0x0000_0000_0000_0000u64, 0).slot_of_timestamp(0xf000_0000_0000_0001),
1
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 32).slot_of_timestamp(0xffff_ffff_0000_0001),
0
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 32).slot_of_timestamp(0xffff_ffff_1000_0001),
0
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 32).slot_of_timestamp(0xffff_ffff_a000_0001),
1
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 32).slot_of_timestamp(0xffff_ffff_f000_0001),
1
);
assert_eq!(
Prefix::new(0x0000_0000_0000_0000u64, 0)
.slot_of(&Prefix::new(0x0000_0000_0000_0000, 4)),
0
);
assert_eq!(
Prefix::new(0x0000_0000_0000_0000u64, 0)
.slot_of(&Prefix::new(0x1000_0000_0000_0000, 8)),
0
);
assert_eq!(
Prefix::new(0x0000_0000_0000_0000u64, 0)
.slot_of(&Prefix::new(0xa000_0000_0000_0000, 16)),
1
);
assert_eq!(
Prefix::new(0x0000_0000_0000_0000u64, 0)
.slot_of(&Prefix::new(0xf000_0000_0000_0000, 32)),
1
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 32)
.slot_of(&Prefix::new(0xffff_ffff_0000_0000, 36)),
0
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 32)
.slot_of(&Prefix::new(0xffff_ffff_1000_0000, 40)),
0
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 32)
.slot_of(&Prefix::new(0xffff_ffff_a000_0000, 44)),
1
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 32)
.slot_of(&Prefix::new(0xffff_ffff_f000_0000, 64)),
1
);
assert_eq!(
Prefix::new(0x0000_0000_0000_0000u64, 0).extend(0),
Prefix::new(0x0000_0000_0000_0000, 1)
);
assert_eq!(
Prefix::new(0x0000_0000_0000_0000u64, 0).extend(1),
Prefix::new(0x8000_0000_0000_0000, 1)
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 32).extend(0),
Prefix::new(0xffff_ffff_0000_0000, 33)
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 32).extend(1),
Prefix::new(0xffff_ffff_8000_0000, 33)
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 60).extend(0),
Prefix::new(0xffff_ffff_0000_0000, 61)
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 60).extend(1),
Prefix::new(0xffff_ffff_0000_0008, 61)
);
assert_eq!(
Prefix::new(0xffff_ffff_0000_0000u64, 63).extend(1),
Prefix::new(0xffff_ffff_0000_0001, 64)
);
}
#[test]
fn prefix_decode_encode() {
type Type = Prefix<u64>;
for input in [
Prefix::new(0xffff_ffff_0000_0000u64, 32),
Prefix::new(0x1234_5678_0000_1111u64, 64),
Prefix::new(u64::MAX, 64),
] {
let input: Type = input;
let encoded = to_bytes::<_, 4096>(&input).unwrap();
let archived = unsafe { archived_root::<Type>(&encoded[..]) };
let decoded: Type = archived.deserialize(&mut Infallible).unwrap();
assert_eq!(decoded, input);
}
}
}