use crate::numeric::Numeric;
use crate::Value;
use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Hash128 {
uuid: uuid::Uuid,
}
impl Hash128 {
pub fn new(high: u64, low: u64) -> Self {
let mut bytes = [0u8; 16];
bytes[0..8].copy_from_slice(&high.to_be_bytes());
bytes[8..16].copy_from_slice(&low.to_be_bytes());
Self {
uuid: uuid::Uuid::from_bytes(bytes),
}
}
pub fn as_i64(&self) -> i64 {
let bytes = self.uuid.as_bytes();
let low = u64::from_be_bytes([
bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
]);
low as i64
}
pub fn hash_values(values: &[Value]) -> Self {
let mut s = String::new();
for (i, value) in values.iter().enumerate() {
if i > 0 {
s.push('\x00'); }
match value {
Value::Null => s.push_str("N:"),
Value::Numeric(Numeric::Integer(n)) => {
s.push_str("I:");
s.push_str(&n.to_string());
}
Value::Numeric(Numeric::Float(f)) => {
s.push_str("F:");
s.push_str(&f64::from(*f).to_bits().to_string());
}
Value::Text(t) => {
s.push_str("T:");
s.push_str(t.as_str());
}
Value::Blob(b) => {
s.push_str("B:");
s.push_str(&hex::encode(b));
}
}
}
Self::hash_str(&s)
}
pub fn hash_str(s: &str) -> Self {
let uuid = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_DNS, s.as_bytes());
Self { uuid }
}
pub fn to_blob(self) -> Vec<u8> {
self.uuid.as_bytes().to_vec()
}
pub fn from_blob(bytes: &[u8]) -> Option<Self> {
if bytes.len() != 16 {
return None;
}
let mut uuid_bytes = [0u8; 16];
uuid_bytes.copy_from_slice(bytes);
Some(Self {
uuid: uuid::Uuid::from_bytes(uuid_bytes),
})
}
pub fn to_value(self) -> Value {
Value::Blob(self.to_blob())
}
pub fn from_value(value: &Value) -> Option<Self> {
match value {
Value::Blob(b) => Self::from_blob(b),
_ => None,
}
}
}
impl std::fmt::Display for Hash128 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.uuid)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HashableRow {
pub rowid: i64,
pub values: Vec<Value>,
cached_hash: Hash128,
}
impl HashableRow {
pub fn new(rowid: i64, values: Vec<Value>) -> Self {
let cached_hash = Self::compute_hash(rowid, &values);
Self {
rowid,
values,
cached_hash,
}
}
fn compute_hash(rowid: i64, values: &[Value]) -> Hash128 {
let mut all_values = Vec::with_capacity(values.len() + 1);
all_values.push(Value::from_i64(rowid));
all_values.extend_from_slice(values);
Hash128::hash_values(&all_values)
}
pub fn cached_hash(&self) -> Hash128 {
self.cached_hash
}
}
impl Hash for HashableRow {
fn hash<H: Hasher>(&self, state: &mut H) {
self.cached_hash.to_blob().hash(state);
}
}
impl PartialOrd for HashableRow {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HashableRow {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self.rowid.cmp(&other.rowid) {
std::cmp::Ordering::Equal => {
self.values.cmp(&other.values)
}
other => other,
}
}
}
type DeltaEntry = (HashableRow, isize);
#[derive(Debug, Clone, Default)]
pub struct Delta {
pub changes: Vec<DeltaEntry>,
}
impl Delta {
pub fn new() -> Self {
Self {
changes: Vec::new(),
}
}
pub fn insert(&mut self, row_key: i64, values: Vec<Value>) {
let row = HashableRow::new(row_key, values);
self.changes.push((row, 1));
}
pub fn delete(&mut self, row_key: i64, values: Vec<Value>) {
let row = HashableRow::new(row_key, values);
self.changes.push((row, -1));
}
pub fn is_empty(&self) -> bool {
self.changes.is_empty()
}
pub fn len(&self) -> usize {
self.changes.len()
}
pub fn merge(&mut self, other: &Delta) {
self.changes.extend(other.changes.iter().cloned());
}
pub fn consolidate(&mut self) {
if self.changes.is_empty() {
return;
}
let mut consolidated: HashMap<HashableRow, isize> = HashMap::default();
for (row, weight) in self.changes.drain(..) {
*consolidated.entry(row).or_insert(0) += weight;
}
self.changes = consolidated
.into_iter()
.filter(|(_, weight)| *weight != 0)
.collect();
}
}
#[derive(Debug, Clone, Default)]
pub struct DeltaPair {
pub left: Delta,
pub right: Delta,
}
impl DeltaPair {
pub fn new(left: Delta, right: Delta) -> Self {
Self { left, right }
}
}
impl From<Delta> for DeltaPair {
fn from(delta: Delta) -> Self {
Self {
left: delta,
right: Delta::new(),
}
}
}
impl From<&Delta> for DeltaPair {
fn from(delta: &Delta) -> Self {
Self {
left: delta.clone(),
right: Delta::new(),
}
}
}
#[derive(Clone, Debug, Default)]
pub struct SimpleZSet<T> {
data: BTreeMap<T, isize>,
}
#[allow(dead_code)]
impl<T: std::hash::Hash + Eq + Ord + Clone> SimpleZSet<T> {
pub fn new() -> Self {
Self {
data: BTreeMap::new(),
}
}
pub fn insert(&mut self, item: T, weight: isize) {
let current = self.data.get(&item).copied().unwrap_or(0);
let new_weight = current + weight;
if new_weight == 0 {
self.data.remove(&item);
} else {
self.data.insert(item, new_weight);
}
}
pub fn iter(&self) -> impl Iterator<Item = (&T, isize)> {
self.data.iter().map(|(k, &v)| (k, v))
}
pub fn to_vec(&self) -> Vec<T> {
self.data
.iter()
.filter(|(_, &weight)| weight > 0)
.map(|(item, _)| item.clone())
.collect()
}
pub fn merge(&mut self, other: &SimpleZSet<T>) {
for (item, weight) in other.iter() {
self.insert(item.clone(), weight);
}
}
pub fn get(&self, item: &T) -> isize {
self.data.get(item).copied().unwrap_or(0)
}
pub fn first(&self) -> Option<(&T, isize)> {
self.data.iter().next().map(|(k, &v)| (k, v))
}
pub fn last(&self) -> Option<(&T, isize)> {
self.data.iter().next_back().map(|(k, &v)| (k, v))
}
pub fn range<R>(&self, range: R) -> impl Iterator<Item = (&T, isize)> + '_
where
R: std::ops::RangeBounds<T>,
{
self.data.range(range).map(|(k, &v)| (k, v))
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn len(&self) -> usize {
self.data.len()
}
}
pub type RowKey = HashableRow;
pub type RowKeyZSet = SimpleZSet<RowKey>;
impl RowKeyZSet {
pub fn from_delta(delta: &Delta) -> Self {
let mut zset = Self::new();
for (row, weight) in &delta.changes {
zset.insert(row.clone(), *weight);
}
zset
}
pub fn seek(&self, target: i64, op: crate::types::SeekOp) -> Vec<(HashableRow, isize)> {
use crate::types::SeekOp;
let best_rowid = match op {
SeekOp::GT => {
self.data
.iter()
.filter(|(row, _)| row.rowid > target)
.map(|(row, _)| row.rowid)
.min()
}
SeekOp::GE { eq_only: false } => {
self.data
.iter()
.filter(|(row, _)| row.rowid >= target)
.map(|(row, _)| row.rowid)
.min()
}
SeekOp::GE { eq_only: true } | SeekOp::LE { eq_only: true } => {
if self.data.iter().any(|(row, _)| row.rowid == target) {
Some(target)
} else {
None
}
}
SeekOp::LT => {
self.data
.iter()
.filter(|(row, _)| row.rowid < target)
.map(|(row, _)| row.rowid)
.max()
}
SeekOp::LE { eq_only: false } => {
self.data
.iter()
.filter(|(row, _)| row.rowid <= target)
.map(|(row, _)| row.rowid)
.max()
}
};
match best_rowid {
Some(rowid) => self
.data
.iter()
.filter(|(row, _)| row.rowid == rowid)
.map(|(k, &v)| (k.clone(), v))
.collect(),
None => Vec::new(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zset_merge_with_weights() {
let mut zset1 = SimpleZSet::new();
zset1.insert(1, 1); zset1.insert(2, 1);
let mut zset2 = SimpleZSet::new();
zset2.insert(2, -1); zset2.insert(3, 1);
zset1.merge(&zset2);
assert_eq!(zset1.iter().count(), 2); assert!(zset1.iter().any(|(k, _)| *k == 1));
assert!(zset1.iter().any(|(k, _)| *k == 3));
assert!(!zset1.iter().any(|(k, _)| *k == 2)); }
#[test]
fn test_zset_represents_updates_as_delete_plus_insert() {
let mut zset = SimpleZSet::new();
zset.insert(1, 1);
zset.insert(1, -1); zset.insert(1, 1);
let weight = zset.iter().find(|(k, _)| **k == 1).map(|(_, w)| w);
assert_eq!(weight, Some(1));
}
#[test]
fn test_hashable_row_delta_operations() {
let mut delta = Delta::new();
delta.insert(1, vec![Value::from_i64(1), Value::from_i64(100)]);
assert_eq!(delta.len(), 1);
delta.delete(1, vec![Value::from_i64(1), Value::from_i64(100)]);
delta.insert(1, vec![Value::from_i64(1), Value::from_i64(200)]);
assert_eq!(delta.len(), 3);
let ops: Vec<_> = delta.changes.iter().collect();
assert_eq!(ops[0].1, 1); assert_eq!(ops[1].1, -1); assert_eq!(ops[2].1, 1);
delta.consolidate();
assert_eq!(delta.len(), 1);
let final_row = &delta.changes[0];
assert_eq!(final_row.0.rowid, 1);
assert_eq!(
final_row.0.values,
vec![Value::from_i64(1), Value::from_i64(200)]
);
assert_eq!(final_row.1, 1);
}
#[test]
fn test_duplicate_row_consolidation() {
let mut delta = Delta::new();
delta.insert(2, vec![Value::from_i64(2), Value::from_i64(300)]);
delta.insert(2, vec![Value::from_i64(2), Value::from_i64(300)]);
assert_eq!(delta.len(), 2);
delta.consolidate();
assert_eq!(delta.len(), 1);
let final_row = &delta.changes[0];
assert_eq!(final_row.0.rowid, 2);
assert_eq!(final_row.1, 2);
}
}