use super::with_internal_tracking;
use hyperloglog::HyperLogLog;
use rhai::Dynamic;
use tdigests::TDigest;
const HLL_DEFAULT_ERROR_RATE: f64 = 0.01;
const HLL_SEED: u128 = 0x6b656c6f72615f686c6c5f73656564;
const HLL_MAGIC: &[u8; 4] = b"HLL\x01";
pub(crate) fn op_display_name(op: &str) -> &str {
match op {
"sum" => "track_sum",
"count" => "track_stats (count)",
"avg" => "track_avg",
"min" => "track_min",
"max" => "track_max",
"unique" => "track_unique",
"bucket" => "track_freq",
"cardinality" => "track_cardinality",
"percentiles" => "track_percentiles",
"top" => "track_top",
"bottom" => "track_bottom",
"top_by" => "track_top_by",
"bottom_by" => "track_bottom_by",
other => other,
}
}
pub(super) fn ensure_operation_metadata(
key: &str,
operation: &str,
) -> Result<(), Box<rhai::EvalAltResult>> {
with_internal_tracking(|internal| {
let op_key = format!("__op_{}", key);
if let Some(existing) = internal.get(&op_key) {
let existing_op = existing.clone().into_immutable_string().unwrap_or_default();
if existing_op != operation {
return Err(format!(
"metric '{}' is already tracked by {}; each metric name can be used by only one track function (use a different name for {})",
key,
op_display_name(&existing_op),
op_display_name(operation)
)
.into());
}
} else {
internal.insert(op_key, Dynamic::from(operation.to_string()));
}
Ok(())
})
}
pub(super) fn record_skipped_unit(key: &str) {
with_internal_tracking(|internal| {
let skip_key = format!("__kelora_track_skipped_{}", key);
if let Some(existing) = internal.get_mut(&skip_key) {
let current = existing.as_int().unwrap_or(0);
*existing = Dynamic::from(current + 1);
} else {
internal.insert(format!("__op_{}", skip_key), Dynamic::from("count"));
internal.insert(skip_key, Dynamic::from(1_i64));
}
});
}
pub(super) fn merge_numeric(existing: Option<Dynamic>, new_value: Dynamic) -> Dynamic {
let new_is_float = new_value.is_float();
if let Some(current) = existing {
let current_is_float = current.is_float();
if current_is_float || new_is_float {
let current_total = if current_is_float {
current.as_float().unwrap_or(0.0)
} else {
current.as_int().unwrap_or(0) as f64
};
let incoming = if new_is_float {
new_value.as_float().unwrap_or(0.0)
} else {
new_value.as_int().unwrap_or(0) as f64
};
Dynamic::from(current_total + incoming)
} else {
let current_total = current.as_int().unwrap_or(0);
let incoming = new_value.as_int().unwrap_or(0);
Dynamic::from(current_total + incoming)
}
} else {
new_value
}
}
pub(super) fn serialize_tdigest(digest: &TDigest) -> Vec<u8> {
let centroids = digest.centroids();
let mut bytes = Vec::new();
let count = centroids.len();
bytes.extend_from_slice(&count.to_le_bytes());
for centroid in centroids {
bytes.extend_from_slice(¢roid.mean.to_le_bytes());
bytes.extend_from_slice(¢roid.weight.to_le_bytes());
}
bytes
}
pub(super) fn deserialize_tdigest(bytes: &[u8]) -> Option<TDigest> {
if bytes.len() < 8 {
return None;
}
let count = usize::from_le_bytes(bytes[0..8].try_into().ok()?);
if bytes.len() < 8 + count * 16 {
return None;
}
let mut centroids = Vec::with_capacity(count);
for i in 0..count {
let offset = 8 + i * 16;
let mean = f64::from_le_bytes(bytes[offset..offset + 8].try_into().ok()?);
let weight = f64::from_le_bytes(bytes[offset + 8..offset + 16].try_into().ok()?);
centroids.push(tdigests::Centroid::new(mean, weight));
}
Some(TDigest::from_centroids(centroids))
}
pub(super) fn serialize_hll(hll: &HyperLogLog) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend_from_slice(HLL_MAGIC);
if let Ok(json) = serde_json::to_vec(hll) {
bytes.extend_from_slice(&json);
}
bytes
}
pub(super) fn deserialize_hll(bytes: &[u8]) -> Option<HyperLogLog> {
if bytes.len() < 4 || &bytes[0..4] != HLL_MAGIC {
return None;
}
serde_json::from_slice(&bytes[4..]).ok()
}
pub(super) fn is_hll_blob(bytes: &[u8]) -> bool {
bytes.len() >= 4 && &bytes[0..4] == HLL_MAGIC
}
pub(super) fn new_hll() -> HyperLogLog {
HyperLogLog::new_deterministic(HLL_DEFAULT_ERROR_RATE, HLL_SEED)
}
pub(super) fn new_hll_with_error(error_rate: f64) -> HyperLogLog {
HyperLogLog::new_deterministic(error_rate, HLL_SEED)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_merge_numeric_integers() {
let result = merge_numeric(Some(Dynamic::from(5i64)), Dynamic::from(3i64));
assert_eq!(result.as_int().unwrap(), 8);
}
#[test]
fn test_merge_numeric_floats() {
let result = merge_numeric(Some(Dynamic::from(5.5f64)), Dynamic::from(3.2f64));
let value = result.as_float().unwrap();
assert!((value - 8.7).abs() < 0.001);
}
#[test]
fn test_merge_numeric_mixed_int_and_float() {
let result = merge_numeric(Some(Dynamic::from(5i64)), Dynamic::from(3.5f64));
let value = result.as_float().unwrap();
assert!((value - 8.5).abs() < 0.001);
}
#[test]
fn test_merge_numeric_no_existing() {
let result = merge_numeric(None, Dynamic::from(42i64));
assert_eq!(result.as_int().unwrap(), 42);
}
#[test]
fn test_merge_numeric_edge_case_zero_plus_zero() {
let result = merge_numeric(Some(Dynamic::from(0i64)), Dynamic::from(0i64));
assert_eq!(result.as_int().unwrap(), 0);
}
#[test]
fn test_merge_numeric_edge_case_negative_numbers() {
let result = merge_numeric(Some(Dynamic::from(-5i64)), Dynamic::from(-3i64));
assert_eq!(result.as_int().unwrap(), -8);
}
#[test]
fn test_merge_numeric_edge_case_large_integers() {
let result = merge_numeric(
Some(Dynamic::from(1_000_000_000i64)),
Dynamic::from(2_000_000_000i64),
);
assert_eq!(result.as_int().unwrap(), 3_000_000_000i64);
}
#[test]
fn test_hll_serialization_roundtrip() {
let mut hll = new_hll();
hll.insert(&"user1");
hll.insert(&"user2");
hll.insert(&"user3");
let bytes = serialize_hll(&hll);
assert!(is_hll_blob(&bytes));
let restored = deserialize_hll(&bytes).unwrap();
assert_eq!(restored.len(), hll.len());
}
}