use crate::stats::ProcessingStats;
use hyperloglog::HyperLogLog;
use rhai::{Dynamic, Engine};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use tdigests::TDigest;
#[derive(Debug, Clone, Default)]
pub struct TrackingSnapshot {
pub user: HashMap<String, Dynamic>,
pub internal: HashMap<String, Dynamic>,
}
impl TrackingSnapshot {
pub fn from_parts(user: HashMap<String, Dynamic>, internal: HashMap<String, Dynamic>) -> Self {
Self { user, internal }
}
}
thread_local! {
pub static THREAD_TRACKING_STATE: RefCell<TrackingSnapshot> = RefCell::new(TrackingSnapshot::default());
}
pub fn get_thread_snapshot() -> TrackingSnapshot {
THREAD_TRACKING_STATE.with(|state| state.borrow().clone())
}
pub fn with_user_tracking<F, R>(f: F) -> R
where
F: FnOnce(&mut HashMap<String, Dynamic>) -> R,
{
THREAD_TRACKING_STATE.with(|state| {
let mut snapshot = state.borrow_mut();
f(&mut snapshot.user)
})
}
pub fn with_internal_tracking<F, R>(f: F) -> R
where
F: FnOnce(&mut HashMap<String, Dynamic>) -> R,
{
THREAD_TRACKING_STATE.with(|state| {
let mut snapshot = state.borrow_mut();
f(&mut snapshot.internal)
})
}
fn record_operation_metadata(key: &str, operation: &str) {
with_internal_tracking(|internal| {
internal.insert(
format!("__op_{}", key),
Dynamic::from(operation.to_string()),
);
});
}
pub fn set_thread_tracking_state(metrics: &HashMap<String, Dynamic>) {
THREAD_TRACKING_STATE.with(|state| {
let mut snapshot = state.borrow_mut();
snapshot.user = metrics.clone();
});
}
pub fn get_thread_tracking_state() -> HashMap<String, Dynamic> {
THREAD_TRACKING_STATE.with(|state| state.borrow().user.clone())
}
pub fn set_thread_internal_state(metrics: &HashMap<String, Dynamic>) {
THREAD_TRACKING_STATE.with(|state| {
let mut snapshot = state.borrow_mut();
snapshot.internal = metrics.clone();
});
}
pub fn get_thread_internal_state() -> HashMap<String, Dynamic> {
THREAD_TRACKING_STATE.with(|state| state.borrow().internal.clone())
}
fn merge_numeric(existing: Option<Dynamic>, new_value: Dynamic) -> Dynamic {
let new_is_float = new_value.is_float();
if let Some(current) = existing {
let current_is_float = current.is_float();
if current_is_float || new_is_float {
let current_total = if current_is_float {
current.as_float().unwrap_or(0.0)
} else {
current.as_int().unwrap_or(0) as f64
};
let incoming = if new_is_float {
new_value.as_float().unwrap_or(0.0)
} else {
new_value.as_int().unwrap_or(0) as f64
};
Dynamic::from(current_total + incoming)
} else {
let current_total = current.as_int().unwrap_or(0);
let incoming = new_value.as_int().unwrap_or(0);
Dynamic::from(current_total + incoming)
}
} else {
new_value
}
}
fn format_error_location(
line_num: Option<usize>,
filename: Option<&str>,
input_files: &[String],
) -> String {
match (line_num, filename) {
(Some(line), Some(fname)) => {
if input_files.is_empty() || input_files.len() == 1 {
format!("line {}", line)
} else {
let basenames: HashSet<_> = input_files
.iter()
.map(|f| {
Path::new(f)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
})
.collect();
if basenames.len() == input_files.len() {
let basename = Path::new(fname)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown");
format!("{}:{}", basename, line)
} else {
format!("{}:{}", fname, line)
}
}
}
(Some(line), None) => format!("line {}", line),
_ => "unknown".to_string(),
}
}
#[allow(clippy::too_many_arguments)]
pub fn track_error(
error_type: &str,
line_num: Option<usize>,
message: &str,
original_line: Option<&str>,
filename: Option<&str>,
verbose: u8,
quiet_level: u8,
config: Option<&crate::pipeline::PipelineConfig>,
format_name: Option<&str>,
) {
with_internal_tracking(|state| {
let count_key = format!("__kelora_error_count_{}", error_type);
let current_count = state
.get(&count_key)
.cloned()
.unwrap_or(Dynamic::from(0i64));
let new_count = current_count.as_int().unwrap_or(0) + 1;
state.insert(count_key.clone(), Dynamic::from(new_count));
state.insert(format!("__op_{}", count_key), Dynamic::from("count"));
if verbose > 0 && quiet_level == 0 {
let use_emoji = if let Some(cfg) = config {
crate::tty::should_use_emoji_with_mode(&cfg.emoji_mode, &cfg.color_mode)
} else {
crate::tty::should_use_emoji_for_stderr()
};
let prefix = if use_emoji { "⚠️ " } else { "kelora: " };
let input_files = config.map(|c| c.input_files.as_slice()).unwrap_or(&[]);
let location = format_error_location(line_num, filename, input_files);
let mut formatted_error = if error_type == "parse" {
let format_info = if let Some(fmt) = format_name {
format!(" (format: {})", fmt)
} else {
String::new()
};
if !location.is_empty() && location != "unknown" {
format!("{}{}{}: {}", prefix, location, format_info, message)
} else {
format!("{}{}{}", prefix, format_info.trim_start(), message)
}
} else {
if !location.is_empty() && location != "unknown" {
format!("{}{}: {} - {}", prefix, location, error_type, message)
} else {
format!("{}{} - {}", prefix, error_type, message)
}
};
if error_type == "parse" && format_name.is_some() && verbose > 0 {
let hint = "\n Hint: Input may contain mixed formats. Consider preprocessing:\n - Split by format: grep '^{' input.log | kelora -f json\n - Use multiline detection: kelora -M 'regex:match=^{' -f json";
formatted_error.push_str(hint);
}
if crate::rhai_functions::strings::is_parallel_mode() {
crate::rhai_functions::strings::capture_stderr(formatted_error.clone());
if verbose >= 2 && error_type == "parse" {
if let Some(line) = original_line {
crate::rhai_functions::strings::capture_stderr(format!(" {}", line));
if verbose >= 3 {
let non_ascii_count = line.chars().filter(|c| !c.is_ascii()).count();
let control_char_count = line
.chars()
.filter(|c| {
c.is_control() && *c != '\t' && *c != '\n' && *c != '\r'
})
.count();
let line_info = format!(" (length: {} chars, non_ascii: {}, control_chars: {}, starts: {:?}, ends: {:?})",
line.len(),
non_ascii_count,
control_char_count,
line.chars().next().unwrap_or('\0'),
line.chars().last().unwrap_or('\0')
);
crate::rhai_functions::strings::capture_stderr(line_info);
}
}
}
} else {
crate::rhai_functions::strings::capture_stderr(formatted_error.clone());
eprintln!("{}", formatted_error);
if verbose >= 2 && error_type == "parse" {
if let Some(line) = original_line {
let indented_line = format!(" {}", line);
crate::rhai_functions::strings::capture_stderr(indented_line.clone());
eprintln!("{}", indented_line);
if verbose >= 3 {
let non_ascii_count = line.chars().filter(|c| !c.is_ascii()).count();
let control_char_count = line
.chars()
.filter(|c| {
c.is_control() && *c != '\t' && *c != '\n' && *c != '\r'
})
.count();
let line_info = format!(" (length: {} chars, non_ascii: {}, control_chars: {}, starts: {:?}, ends: {:?})",
line.len(),
non_ascii_count,
control_char_count,
line.chars().next().unwrap_or('\0'),
line.chars().last().unwrap_or('\0')
);
crate::rhai_functions::strings::capture_stderr(line_info.clone());
eprintln!("{}", line_info);
}
}
}
}
}
let samples_key = format!("__kelora_error_samples_{}", error_type);
let current_samples = state
.get(&samples_key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Array::new()));
if let Ok(mut arr) = current_samples.into_array() {
if arr.len() < 3 {
let mut sample_obj = rhai::Map::new();
sample_obj.insert("error_type".into(), Dynamic::from(error_type.to_string()));
sample_obj.insert(
"line_num".into(),
Dynamic::from(line_num.unwrap_or(0) as i64),
);
sample_obj.insert("message".into(), Dynamic::from(message.to_string()));
if let Some(line) = original_line {
sample_obj.insert("original_line".into(), Dynamic::from(line.to_string()));
}
if let Some(filename) = filename {
sample_obj.insert("filename".into(), Dynamic::from(filename.to_string()));
}
arr.push(Dynamic::from(sample_obj));
}
state.insert(samples_key.clone(), Dynamic::from(arr));
state.insert(format!("__op_{}", samples_key), Dynamic::from("unique"));
}
});
}
pub fn has_errors_in_tracking(snapshot: &TrackingSnapshot) -> bool {
for (key, value) in &snapshot.internal {
if let Some(_error_type) = key.strip_prefix("__kelora_error_count_") {
if let Ok(count) = value.as_int() {
if count > 0 {
return true;
}
}
}
}
false
}
pub fn format_fatal_error_line(snapshot: &TrackingSnapshot) -> String {
let mut total_errors = 0;
let mut error_types = Vec::new();
let mut all_samples: Vec<rhai::Map> = Vec::new();
for (key, value) in &snapshot.internal {
if let Some(error_type) = key.strip_prefix("__kelora_error_count_") {
if let Ok(count) = value.as_int() {
if count > 0 {
total_errors += count;
error_types.push((error_type.to_string(), count));
}
}
}
}
if total_errors == 0 {
return "fatal error encountered".to_string();
}
for (key, value) in &snapshot.internal {
if let Some(_error_type) = key.strip_prefix("__kelora_error_samples_") {
if let Ok(sample_array) = value.clone().into_array() {
for sample in sample_array {
if let Some(sample_map) = sample.try_cast::<rhai::Map>() {
all_samples.push(sample_map);
}
}
}
}
}
if error_types.len() == 1 {
let (error_type, count) = &error_types[0];
if *count == 1 && !all_samples.is_empty() {
let sample = &all_samples[0];
let message = sample
.get("message")
.and_then(|v| v.clone().into_string().ok())
.unwrap_or_else(|| "unknown error".to_string());
let message = if message.len() > 80 {
format!("{}...", &message[..77])
} else {
message
};
format!("1 {} error: {}", error_type, message)
} else if *count <= 3 && all_samples.len() as i64 == *count {
let lines: Vec<String> = all_samples
.iter()
.map(|s| {
s.get("line_num")
.and_then(|v| v.as_int().ok())
.map(|n| n.to_string())
.unwrap_or_else(|| "?".to_string())
})
.collect();
format!(
"{} {} errors at lines {}",
count,
error_type,
lines.join(", ")
)
} else if !all_samples.is_empty() {
let first_line = all_samples[0]
.get("line_num")
.and_then(|v| v.as_int().ok())
.unwrap_or(0);
format!(
"{} {} errors (first at line {})",
count, error_type, first_line
)
} else {
format!("{} {} errors", count, error_type)
}
} else {
if total_errors <= 10 {
let breakdown: Vec<String> = error_types
.iter()
.map(|(t, c)| format!("{} {}", c, t))
.collect();
format!("{} errors: {}", total_errors, breakdown.join(", "))
} else {
format!("{} errors (mixed types)", total_errors)
}
}
}
pub fn extract_error_summary_from_tracking(
snapshot: &TrackingSnapshot,
verbose: u8,
stats: Option<&ProcessingStats>,
_config: Option<&crate::config::KeloraConfig>,
) -> Option<String> {
let mut total_errors = 0;
let mut error_types = Vec::new();
let mut sample_objects: Vec<rhai::Map> = Vec::new();
for (key, value) in &snapshot.internal {
if let Some(error_type) = key.strip_prefix("__kelora_error_count_") {
if let Ok(count) = value.as_int() {
if count > 0 {
total_errors += count;
error_types.push((error_type.to_string(), count));
}
}
}
}
if total_errors == 0 {
return None;
}
for (key, value) in &snapshot.internal {
if let Some(_error_type) = key.strip_prefix("__kelora_error_samples_") {
if let Ok(sample_array) = value.clone().into_array() {
for sample in sample_array {
if let Some(sample_map) = sample.try_cast::<rhai::Map>() {
sample_objects.push(sample_map);
}
}
}
}
}
let mut summary = String::new();
let primary_error_type = if error_types.len() == 1 {
&error_types[0].0
} else {
"mixed"
};
if primary_error_type == "mixed" {
summary.push_str(&format!("Mixed errors: {} total", total_errors));
} else {
summary.push_str(&format!(
"{}{} errors: {} total",
primary_error_type.chars().next().unwrap().to_uppercase(),
&primary_error_type[1..],
total_errors
));
}
let mut shown_samples = 0;
for sample_obj in &sample_objects {
if shown_samples >= 3 {
break;
}
let line_num = sample_obj
.get("line_num")
.and_then(|v| v.as_int().ok())
.unwrap_or(0);
let message = sample_obj
.get("message")
.and_then(|v| v.clone().into_string().ok())
.unwrap_or_else(|| "unknown error".to_string());
let filename = sample_obj
.get("filename")
.and_then(|v| v.clone().into_string().ok())
.unwrap_or_else(|| "stdin".to_string());
let original_line = sample_obj
.get("original_line")
.and_then(|v| v.clone().into_string().ok());
let input_files = &[];
let location = format_error_location(Some(line_num as usize), Some(&filename), input_files);
summary.push_str(&format!("\n {}: {}", location, message));
if verbose > 0 {
if let Some(orig_line) = original_line {
let display_line = if orig_line.len() > 100 {
format!("{}...", &orig_line[..97])
} else {
orig_line
};
summary.push_str(&format!("\n {}", display_line));
}
}
shown_samples += 1;
}
if total_errors as usize > shown_samples {
let remaining = total_errors as usize - shown_samples;
let message = if verbose > 0 {
"All errors shown during processing. Use --no-diagnostics to suppress this summary."
} else {
"Use -v to see each error or --no-diagnostics to suppress this summary."
};
summary.push_str(&format!("\n [+{} more. {}]", remaining, message));
}
if let Some(stats) = stats {
if stats.yearless_timestamps > 0 {
let warning_msg = format!(
"Year-less timestamp format detected ({} parse{})\n\
Format lacks year (e.g., \"Dec 31 23:59:59\")\n\
Year inferred using heuristic (+/- 1 year from current date)\n\
Timestamps >18 months old may be incorrect",
stats.yearless_timestamps,
if stats.yearless_timestamps == 1 {
""
} else {
"s"
}
);
summary.push_str("\n ");
summary.push_str(
&crate::config::format_warning_message_auto(&warning_msg).replace('\n', "\n "),
);
}
}
Some(summary)
}
fn serialize_tdigest(digest: &TDigest) -> Vec<u8> {
let centroids = digest.centroids();
let mut bytes = Vec::new();
let count = centroids.len();
bytes.extend_from_slice(&count.to_le_bytes());
for centroid in centroids {
bytes.extend_from_slice(¢roid.mean.to_le_bytes());
bytes.extend_from_slice(¢roid.weight.to_le_bytes());
}
bytes
}
fn deserialize_tdigest(bytes: &[u8]) -> Option<TDigest> {
if bytes.len() < 8 {
return None;
}
let count = usize::from_le_bytes(bytes[0..8].try_into().ok()?);
if bytes.len() < 8 + count * 16 {
return None;
}
let mut centroids = Vec::with_capacity(count);
for i in 0..count {
let offset = 8 + i * 16;
let mean = f64::from_le_bytes(bytes[offset..offset + 8].try_into().ok()?);
let weight = f64::from_le_bytes(bytes[offset + 8..offset + 16].try_into().ok()?);
centroids.push(tdigests::Centroid::new(mean, weight));
}
Some(TDigest::from_centroids(centroids))
}
const HLL_DEFAULT_ERROR_RATE: f64 = 0.01;
const HLL_SEED: u128 = 0x6b656c6f72615f686c6c5f73656564;
const HLL_MAGIC: &[u8; 4] = b"HLL\x01";
fn serialize_hll(hll: &HyperLogLog) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend_from_slice(HLL_MAGIC);
if let Ok(json) = serde_json::to_vec(hll) {
bytes.extend_from_slice(&json);
}
bytes
}
fn deserialize_hll(bytes: &[u8]) -> Option<HyperLogLog> {
if bytes.len() < 4 || &bytes[0..4] != HLL_MAGIC {
return None;
}
serde_json::from_slice(&bytes[4..]).ok()
}
pub fn is_hll_blob(bytes: &[u8]) -> bool {
bytes.len() >= 4 && &bytes[0..4] == HLL_MAGIC
}
fn new_hll() -> HyperLogLog {
HyperLogLog::new_deterministic(HLL_DEFAULT_ERROR_RATE, HLL_SEED)
}
fn new_hll_with_error(error_rate: f64) -> HyperLogLog {
HyperLogLog::new_deterministic(error_rate, HLL_SEED)
}
fn track_cardinality_impl<V: std::hash::Hash>(key: &str, value: &V) {
with_user_tracking(|state| {
let mut hll = if let Some(existing) = state.get(key) {
if let Ok(bytes) = existing.clone().into_blob() {
deserialize_hll(&bytes).unwrap_or_else(new_hll)
} else {
new_hll()
}
} else {
new_hll()
};
hll.insert(value);
let bytes = serialize_hll(&hll);
state.insert(key.to_string(), Dynamic::from_blob(bytes));
});
record_operation_metadata(key, "cardinality");
}
fn track_cardinality_with_error_impl<V: std::hash::Hash>(key: &str, value: &V, error_rate: f64) {
let error_rate = error_rate.clamp(0.001, 0.26);
with_user_tracking(|state| {
let mut hll = if let Some(existing) = state.get(key) {
if let Ok(bytes) = existing.clone().into_blob() {
deserialize_hll(&bytes).unwrap_or_else(|| new_hll_with_error(error_rate))
} else {
new_hll_with_error(error_rate)
}
} else {
new_hll_with_error(error_rate)
};
hll.insert(value);
let bytes = serialize_hll(&hll);
state.insert(key.to_string(), Dynamic::from_blob(bytes));
});
record_operation_metadata(key, "cardinality");
}
fn track_percentiles_impl(
key: &str,
value: f64,
percentiles: rhai::Array,
) -> Result<(), Box<rhai::EvalAltResult>> {
if percentiles.is_empty() {
return Err("track_percentiles requires a non-empty array of percentiles".into());
}
let mut valid_percentiles = Vec::new();
let mut seen = HashSet::new();
for p in percentiles {
let percentile = if p.is_int() {
p.as_int().map_err(|_| -> Box<rhai::EvalAltResult> {
"track_percentiles percentile must be a number".into()
})? as f64
} else if p.is_float() {
p.as_float().map_err(|_| -> Box<rhai::EvalAltResult> {
"track_percentiles percentile must be a number".into()
})?
} else {
return Err("track_percentiles percentile must be a number".into());
};
if !(0.0..=1.0).contains(&percentile) {
return Err(format!(
"track_percentiles percentile must be in range [0.0, 1.0], got {}",
percentile
)
.into());
}
if !seen.contains(&percentile.to_bits()) {
seen.insert(percentile.to_bits());
valid_percentiles.push(percentile);
}
}
if !value.is_finite() {
return Ok(());
}
for percentile in valid_percentiles {
let percentage = percentile * 100.0;
let percentile_str = if percentage.fract() == 0.0 {
format!("p{}", percentage as i64)
} else {
let formatted = format!("{:.10}", percentage);
let trimmed = formatted.trim_end_matches('0').trim_end_matches('.');
format!("p{}", trimmed)
};
let metric_key = format!("{}_{}", key, percentile_str);
with_user_tracking(|state| {
let new_digest = TDigest::from_values(vec![value]);
let digest = if let Some(existing) = state.get(&metric_key) {
if let Ok(bytes) = existing.clone().into_blob() {
if let Some(existing_digest) = deserialize_tdigest(&bytes) {
existing_digest.merge(&new_digest)
} else {
new_digest
}
} else {
new_digest
}
} else {
new_digest
};
let bytes = serialize_tdigest(&digest);
state.insert(metric_key.clone(), Dynamic::from_blob(bytes));
});
record_operation_metadata(&metric_key, "percentiles");
}
Ok(())
}
pub fn register_functions(engine: &mut Engine) {
engine.register_fn(
"track_count",
|key: Dynamic| -> Result<(), Box<rhai::EvalAltResult>> {
let type_name = key.type_name().to_string();
let key = key.into_string().map_err(|_| -> Box<rhai::EvalAltResult> {
format!(
"track_count requires a string key; got {}. Hint: use to_string() for numbers (e.g. track_count(e.status.to_string()))",
type_name
)
.into()
})?;
with_user_tracking(|state| {
let updated =
merge_numeric(state.get(key.as_str()).cloned(), Dynamic::from(1_i64));
state.insert(key.to_string(), updated);
});
record_operation_metadata(&key, "count");
Ok(())
},
);
engine.register_fn("track_sum", |key: &str, value: i64| {
with_user_tracking(|state| {
let updated = merge_numeric(state.get(key).cloned(), Dynamic::from(value));
state.insert(key.to_string(), updated);
});
record_operation_metadata(key, "sum");
});
engine.register_fn("track_sum", |key: &str, value: i32| {
with_user_tracking(|state| {
let updated = merge_numeric(state.get(key).cloned(), Dynamic::from(value));
state.insert(key.to_string(), updated);
});
record_operation_metadata(key, "sum");
});
engine.register_fn("track_sum", |key: &str, value: f64| {
with_user_tracking(|state| {
let updated = merge_numeric(state.get(key).cloned(), Dynamic::from(value));
state.insert(key.to_string(), updated);
});
record_operation_metadata(key, "sum");
});
engine.register_fn("track_sum", |key: &str, value: f32| {
with_user_tracking(|state| {
let updated = merge_numeric(state.get(key).cloned(), Dynamic::from(value));
state.insert(key.to_string(), updated);
});
record_operation_metadata(key, "sum");
});
engine.register_fn("track_sum", |_key: &str, _value: ()| {
});
engine.register_fn("track_avg", |key: &str, value: i64| {
with_user_tracking(|state| {
let current = state.get(key).cloned();
let (new_sum, new_count) = if let Some(existing) = current {
if let Some(map) = existing.try_cast::<rhai::Map>() {
let existing_sum = map
.get("sum")
.and_then(|v| {
if v.is_float() {
v.as_float().ok()
} else if v.is_int() {
v.as_int().ok().map(|i| i as f64)
} else {
None
}
})
.unwrap_or(0.0);
let existing_count =
map.get("count").and_then(|v| v.as_int().ok()).unwrap_or(0);
(existing_sum + value as f64, existing_count + 1)
} else {
(value as f64, 1)
}
} else {
(value as f64, 1)
};
let mut map = rhai::Map::new();
map.insert("sum".into(), Dynamic::from(new_sum));
map.insert("count".into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
});
record_operation_metadata(key, "avg");
});
engine.register_fn("track_avg", |key: &str, value: i32| {
with_user_tracking(|state| {
let current = state.get(key).cloned();
let (new_sum, new_count) = if let Some(existing) = current {
if let Some(map) = existing.try_cast::<rhai::Map>() {
let existing_sum = map
.get("sum")
.and_then(|v| {
if v.is_float() {
v.as_float().ok()
} else if v.is_int() {
v.as_int().ok().map(|i| i as f64)
} else {
None
}
})
.unwrap_or(0.0);
let existing_count =
map.get("count").and_then(|v| v.as_int().ok()).unwrap_or(0);
(existing_sum + value as f64, existing_count + 1)
} else {
(value as f64, 1)
}
} else {
(value as f64, 1)
};
let mut map = rhai::Map::new();
map.insert("sum".into(), Dynamic::from(new_sum));
map.insert("count".into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
});
record_operation_metadata(key, "avg");
});
engine.register_fn("track_avg", |key: &str, value: f64| {
with_user_tracking(|state| {
let current = state.get(key).cloned();
let (new_sum, new_count) = if let Some(existing) = current {
if let Some(map) = existing.try_cast::<rhai::Map>() {
let existing_sum = map
.get("sum")
.and_then(|v| {
if v.is_float() {
v.as_float().ok()
} else if v.is_int() {
v.as_int().ok().map(|i| i as f64)
} else {
None
}
})
.unwrap_or(0.0);
let existing_count =
map.get("count").and_then(|v| v.as_int().ok()).unwrap_or(0);
(existing_sum + value, existing_count + 1)
} else {
(value, 1)
}
} else {
(value, 1)
};
let mut map = rhai::Map::new();
map.insert("sum".into(), Dynamic::from(new_sum));
map.insert("count".into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
});
record_operation_metadata(key, "avg");
});
engine.register_fn("track_avg", |key: &str, value: f32| {
with_user_tracking(|state| {
let current = state.get(key).cloned();
let (new_sum, new_count) = if let Some(existing) = current {
if let Some(map) = existing.try_cast::<rhai::Map>() {
let existing_sum = map
.get("sum")
.and_then(|v| {
if v.is_float() {
v.as_float().ok()
} else if v.is_int() {
v.as_int().ok().map(|i| i as f64)
} else {
None
}
})
.unwrap_or(0.0);
let existing_count =
map.get("count").and_then(|v| v.as_int().ok()).unwrap_or(0);
(existing_sum + value as f64, existing_count + 1)
} else {
(value as f64, 1)
}
} else {
(value as f64, 1)
};
let mut map = rhai::Map::new();
map.insert("sum".into(), Dynamic::from(new_sum));
map.insert("count".into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
});
record_operation_metadata(key, "avg");
});
engine.register_fn("track_avg", |_key: &str, _value: ()| {
});
engine.register_fn("track_min", |key: &str, value: i64| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MAX) as f64
} else {
current.as_float().unwrap_or(f64::INFINITY)
};
let value_f64 = value as f64;
if value_f64 < current_val {
state.insert(key.to_string(), Dynamic::from(value));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "min");
}
});
engine.register_fn("track_min", |key: &str, value: i32| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MAX) as f64
} else {
current.as_float().unwrap_or(f64::INFINITY)
};
let value_f64 = value as f64;
if value_f64 < current_val {
state.insert(key.to_string(), Dynamic::from(value));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "min");
}
});
engine.register_fn("track_min", |key: &str, value: f64| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MAX) as f64
} else {
current.as_float().unwrap_or(f64::INFINITY)
};
if value < current_val {
state.insert(key.to_string(), Dynamic::from(value));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "min");
}
});
engine.register_fn("track_min", |key: &str, value: f32| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MAX) as f64
} else {
current.as_float().unwrap_or(f64::INFINITY)
};
let value_f64 = value as f64;
if value_f64 < current_val {
state.insert(key.to_string(), Dynamic::from(value));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "min");
}
});
engine.register_fn("track_min", |_key: &str, _value: ()| {
});
engine.register_fn("track_max", |key: &str, value: i64| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::NEG_INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MIN) as f64
} else {
current.as_float().unwrap_or(f64::NEG_INFINITY)
};
let value_f64 = value as f64;
if value_f64 > current_val {
state.insert(key.to_string(), Dynamic::from(value));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "max");
}
});
engine.register_fn("track_max", |key: &str, value: i32| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::NEG_INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MIN) as f64
} else {
current.as_float().unwrap_or(f64::NEG_INFINITY)
};
let value_f64 = value as f64;
if value_f64 > current_val {
state.insert(key.to_string(), Dynamic::from(value));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "max");
}
});
engine.register_fn("track_max", |key: &str, value: f64| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::NEG_INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MIN) as f64
} else {
current.as_float().unwrap_or(f64::NEG_INFINITY)
};
if value > current_val {
state.insert(key.to_string(), Dynamic::from(value));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "max");
}
});
engine.register_fn("track_max", |key: &str, value: f32| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::NEG_INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MIN) as f64
} else {
current.as_float().unwrap_or(f64::NEG_INFINITY)
};
let value_f64 = value as f64;
if value_f64 > current_val {
state.insert(key.to_string(), Dynamic::from(value));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "max");
}
});
engine.register_fn("track_max", |_key: &str, _value: ()| {
});
engine.register_fn(
"track_percentiles",
|key: &str, value: i64, percentiles: rhai::Array| {
track_percentiles_impl(key, value as f64, percentiles)
},
);
engine.register_fn(
"track_percentiles",
|key: &str, value: i32, percentiles: rhai::Array| {
track_percentiles_impl(key, value as f64, percentiles)
},
);
engine.register_fn(
"track_percentiles",
|key: &str, value: f64, percentiles: rhai::Array| {
track_percentiles_impl(key, value, percentiles)
},
);
engine.register_fn(
"track_percentiles",
|key: &str, value: f32, percentiles: rhai::Array| {
track_percentiles_impl(key, value as f64, percentiles)
},
);
engine.register_fn(
"track_percentiles",
|_key: &str,
_value: (),
_percentiles: rhai::Array|
-> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
engine.register_fn("track_percentiles", |key: &str, value: i64| {
let default_percentiles = vec![
Dynamic::from(0.50_f64),
Dynamic::from(0.95_f64),
Dynamic::from(0.99_f64),
];
track_percentiles_impl(key, value as f64, default_percentiles)
});
engine.register_fn("track_percentiles", |key: &str, value: i32| {
let default_percentiles = vec![
Dynamic::from(0.50_f64),
Dynamic::from(0.95_f64),
Dynamic::from(0.99_f64),
];
track_percentiles_impl(key, value as f64, default_percentiles)
});
engine.register_fn("track_percentiles", |key: &str, value: f64| {
let default_percentiles = vec![
Dynamic::from(0.50_f64),
Dynamic::from(0.95_f64),
Dynamic::from(0.99_f64),
];
track_percentiles_impl(key, value, default_percentiles)
});
engine.register_fn("track_percentiles", |key: &str, value: f32| {
let default_percentiles = vec![
Dynamic::from(0.50_f64),
Dynamic::from(0.95_f64),
Dynamic::from(0.99_f64),
];
track_percentiles_impl(key, value as f64, default_percentiles)
});
engine.register_fn("track_percentiles", |_key: &str, _value: ()| {
});
fn track_stats_impl(
key: &str,
value: f64,
percentiles: rhai::Array,
) -> Result<(), Box<rhai::EvalAltResult>> {
if !value.is_finite() {
return Ok(());
}
let min_key = format!("{}_min", key);
with_user_tracking(|state| {
let current = state
.get(&min_key)
.cloned()
.unwrap_or(Dynamic::from(f64::INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MAX) as f64
} else {
current.as_float().unwrap_or(f64::INFINITY)
};
if value < current_val {
state.insert(min_key.clone(), Dynamic::from(value));
}
});
record_operation_metadata(&min_key, "min");
let max_key = format!("{}_max", key);
with_user_tracking(|state| {
let current = state
.get(&max_key)
.cloned()
.unwrap_or(Dynamic::from(f64::NEG_INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MIN) as f64
} else {
current.as_float().unwrap_or(f64::NEG_INFINITY)
};
if value > current_val {
state.insert(max_key.clone(), Dynamic::from(value));
}
});
record_operation_metadata(&max_key, "max");
let avg_key = format!("{}_avg", key);
with_user_tracking(|state| {
let current = state.get(&avg_key).cloned();
let (new_sum, new_count) = if let Some(existing) = current {
if let Some(map) = existing.try_cast::<rhai::Map>() {
let existing_sum = map
.get("sum")
.and_then(|v| {
if v.is_float() {
v.as_float().ok()
} else if v.is_int() {
v.as_int().ok().map(|i| i as f64)
} else {
None
}
})
.unwrap_or(0.0);
let existing_count =
map.get("count").and_then(|v| v.as_int().ok()).unwrap_or(0);
(existing_sum + value, existing_count + 1)
} else {
(value, 1)
}
} else {
(value, 1)
};
let mut map = rhai::Map::new();
map.insert("sum".into(), Dynamic::from(new_sum));
map.insert("count".into(), Dynamic::from(new_count));
state.insert(avg_key.clone(), Dynamic::from(map));
});
record_operation_metadata(&avg_key, "avg");
let count_key = format!("{}_count", key);
with_user_tracking(|state| {
let updated = merge_numeric(state.get(&count_key).cloned(), Dynamic::from(1_i64));
state.insert(count_key.clone(), updated);
});
record_operation_metadata(&count_key, "count");
let sum_key = format!("{}_sum", key);
with_user_tracking(|state| {
let updated = merge_numeric(state.get(&sum_key).cloned(), Dynamic::from(value));
state.insert(sum_key.clone(), updated);
});
record_operation_metadata(&sum_key, "sum");
track_percentiles_impl(key, value, percentiles)?;
Ok(())
}
engine.register_fn(
"track_stats",
|key: &str, value: i64, percentiles: rhai::Array| {
track_stats_impl(key, value as f64, percentiles)
},
);
engine.register_fn(
"track_stats",
|key: &str, value: i32, percentiles: rhai::Array| {
track_stats_impl(key, value as f64, percentiles)
},
);
engine.register_fn(
"track_stats",
|key: &str, value: f64, percentiles: rhai::Array| track_stats_impl(key, value, percentiles),
);
engine.register_fn(
"track_stats",
|key: &str, value: f32, percentiles: rhai::Array| {
track_stats_impl(key, value as f64, percentiles)
},
);
engine.register_fn(
"track_stats",
|_key: &str,
_value: (),
_percentiles: rhai::Array|
-> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
engine.register_fn("track_stats", |key: &str, value: i64| {
let default_percentiles = vec![
Dynamic::from(0.50_f64),
Dynamic::from(0.95_f64),
Dynamic::from(0.99_f64),
];
track_stats_impl(key, value as f64, default_percentiles)
});
engine.register_fn("track_stats", |key: &str, value: i32| {
let default_percentiles = vec![
Dynamic::from(0.50_f64),
Dynamic::from(0.95_f64),
Dynamic::from(0.99_f64),
];
track_stats_impl(key, value as f64, default_percentiles)
});
engine.register_fn("track_stats", |key: &str, value: f64| {
let default_percentiles = vec![
Dynamic::from(0.50_f64),
Dynamic::from(0.95_f64),
Dynamic::from(0.99_f64),
];
track_stats_impl(key, value, default_percentiles)
});
engine.register_fn("track_stats", |key: &str, value: f32| {
let default_percentiles = vec![
Dynamic::from(0.50_f64),
Dynamic::from(0.95_f64),
Dynamic::from(0.99_f64),
];
track_stats_impl(key, value as f64, default_percentiles)
});
engine.register_fn("track_stats", |_key: &str, _value: ()| {
});
engine.register_fn("track_unique", |key: &str, value: &str| {
let updated = with_user_tracking(|state| {
let current = state.get(key).cloned().unwrap_or_else(|| {
Dynamic::from(rhai::Array::new())
});
if let Ok(mut arr) = current.into_array() {
let value_dynamic = Dynamic::from(value.to_string());
if !arr
.iter()
.any(|v| v.clone().into_string().unwrap_or_default() == value)
{
arr.push(value_dynamic);
}
state.insert(key.to_string(), Dynamic::from(arr));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "unique");
}
});
engine.register_fn("track_unique", |key: &str, value: i64| {
let updated = with_user_tracking(|state| {
let current = state.get(key).cloned().unwrap_or_else(|| {
Dynamic::from(rhai::Array::new())
});
if let Ok(mut arr) = current.into_array() {
let value_dynamic = Dynamic::from(value);
if !arr.iter().any(|v| v.as_int().unwrap_or(i64::MIN) == value) {
arr.push(value_dynamic);
}
state.insert(key.to_string(), Dynamic::from(arr));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "unique");
}
});
engine.register_fn("track_unique", |key: &str, value: i32| {
let updated = with_user_tracking(|state| {
let current = state.get(key).cloned().unwrap_or_else(|| {
Dynamic::from(rhai::Array::new())
});
if let Ok(mut arr) = current.into_array() {
let value_dynamic = Dynamic::from(value);
if !arr
.iter()
.any(|v| v.as_int().unwrap_or(i64::MIN) == (value as i64))
{
arr.push(value_dynamic);
}
state.insert(key.to_string(), Dynamic::from(arr));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "unique");
}
});
engine.register_fn("track_unique", |key: &str, value: f64| {
let updated = with_user_tracking(|state| {
let current = state.get(key).cloned().unwrap_or_else(|| {
Dynamic::from(rhai::Array::new())
});
if let Ok(mut arr) = current.into_array() {
let value_dynamic = Dynamic::from(value);
if !arr
.iter()
.any(|v| v.as_float().unwrap_or(f64::NAN) == value)
{
arr.push(value_dynamic);
}
state.insert(key.to_string(), Dynamic::from(arr));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "unique");
}
});
engine.register_fn("track_unique", |key: &str, value: f32| {
let updated = with_user_tracking(|state| {
let current = state.get(key).cloned().unwrap_or_else(|| {
Dynamic::from(rhai::Array::new())
});
if let Ok(mut arr) = current.into_array() {
let value_dynamic = Dynamic::from(value);
if !arr
.iter()
.any(|v| v.as_float().unwrap_or(f64::NAN) == (value as f64))
{
arr.push(value_dynamic);
}
state.insert(key.to_string(), Dynamic::from(arr));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "unique");
}
});
engine.register_fn("track_unique", |_key: &str, _value: ()| {
});
engine.register_fn("track_cardinality", |key: &str, value: &str| {
let s = value.to_string();
track_cardinality_impl(key, &s);
});
engine.register_fn("track_cardinality", |key: &str, value: i64| {
track_cardinality_impl(key, &value);
});
engine.register_fn("track_cardinality", |key: &str, value: i32| {
let v = value as i64;
track_cardinality_impl(key, &v);
});
engine.register_fn("track_cardinality", |key: &str, value: f64| {
let bits = value.to_bits();
track_cardinality_impl(key, &bits);
});
engine.register_fn("track_cardinality", |key: &str, value: f32| {
let bits = (value as f64).to_bits();
track_cardinality_impl(key, &bits);
});
engine.register_fn(
"track_cardinality",
|key: &str, value: &str, error_rate: f64| {
let s = value.to_string();
track_cardinality_with_error_impl(key, &s, error_rate);
},
);
engine.register_fn(
"track_cardinality",
|key: &str, value: i64, error_rate: f64| {
track_cardinality_with_error_impl(key, &value, error_rate);
},
);
engine.register_fn(
"track_cardinality",
|key: &str, value: i32, error_rate: f64| {
let v = value as i64;
track_cardinality_with_error_impl(key, &v, error_rate);
},
);
engine.register_fn(
"track_cardinality",
|key: &str, value: f64, error_rate: f64| {
let bits = value.to_bits();
track_cardinality_with_error_impl(key, &bits, error_rate);
},
);
engine.register_fn(
"track_cardinality",
|key: &str, value: f32, error_rate: f64| {
let bits = (value as f64).to_bits();
track_cardinality_with_error_impl(key, &bits, error_rate);
},
);
engine.register_fn("track_cardinality", |_key: &str, _value: ()| {
});
engine.register_fn(
"track_cardinality",
|_key: &str, _value: (), _error_rate: f64| {
},
);
engine.register_fn("track_bucket", |key: &str, bucket: &str| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Map::new()));
if let Some(mut map) = current.try_cast::<rhai::Map>() {
let count = map.get(bucket).cloned().unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
map.insert(bucket.into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "bucket");
}
});
engine.register_fn("track_bucket", |key: &str, bucket: i64| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Map::new()));
if let Some(mut map) = current.try_cast::<rhai::Map>() {
let bucket_str = bucket.to_string();
let count = map
.get(bucket_str.as_str())
.cloned()
.unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
map.insert(bucket_str.into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "bucket");
}
});
engine.register_fn("track_bucket", |key: &str, bucket: i32| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Map::new()));
if let Some(mut map) = current.try_cast::<rhai::Map>() {
let bucket_str = bucket.to_string();
let count = map
.get(bucket_str.as_str())
.cloned()
.unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
map.insert(bucket_str.into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "bucket");
}
});
engine.register_fn("track_bucket", |key: &str, bucket: f64| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Map::new()));
if let Some(mut map) = current.try_cast::<rhai::Map>() {
let bucket_str = bucket.to_string();
let count = map
.get(bucket_str.as_str())
.cloned()
.unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
map.insert(bucket_str.into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "bucket");
}
});
engine.register_fn("track_bucket", |key: &str, bucket: f32| {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Map::new()));
if let Some(mut map) = current.try_cast::<rhai::Map>() {
let bucket_str = bucket.to_string();
let count = map
.get(bucket_str.as_str())
.cloned()
.unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
map.insert(bucket_str.into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "bucket");
}
});
engine.register_fn("track_bucket", |_key: &str, _value: ()| {
});
engine.register_fn(
"track_top",
|key: &str, item_key: &str, n: i64| -> Result<(), Box<rhai::EvalAltResult>> {
if n < 1 {
return Err(format!("track_top requires n >= 1, got {}", n).into());
}
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Array::new()));
if let Ok(mut arr) = current.into_array() {
let mut found_idx = None;
for (idx, elem) in arr.iter().enumerate() {
if let Some(map) = elem.clone().try_cast::<rhai::Map>() {
if let Some(k) = map.get("key") {
if k.clone().into_string().unwrap_or_default() == item_key {
found_idx = Some(idx);
break;
}
}
}
}
if let Some(idx) = found_idx {
if let Some(map) = arr[idx].clone().try_cast::<rhai::Map>() {
let count = map.get("count").cloned().unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
let mut new_map = rhai::Map::new();
new_map.insert("key".into(), Dynamic::from(item_key.to_string()));
new_map.insert("count".into(), Dynamic::from(new_count));
arr[idx] = Dynamic::from(new_map);
}
} else {
let mut new_map = rhai::Map::new();
new_map.insert("key".into(), Dynamic::from(item_key.to_string()));
new_map.insert("count".into(), Dynamic::from(1i64));
arr.push(Dynamic::from(new_map));
}
arr.sort_by(|a, b| {
let a_map = a.clone().try_cast::<rhai::Map>();
let b_map = b.clone().try_cast::<rhai::Map>();
if let (Some(a_m), Some(b_m)) = (a_map, b_map) {
let a_count =
a_m.get("count").and_then(|v| v.as_int().ok()).unwrap_or(0);
let b_count =
b_m.get("count").and_then(|v| v.as_int().ok()).unwrap_or(0);
let a_key = a_m
.get("key")
.and_then(|v| v.clone().into_string().ok())
.unwrap_or_default();
let b_key = b_m
.get("key")
.and_then(|v| v.clone().into_string().ok())
.unwrap_or_default();
match b_count.cmp(&a_count) {
std::cmp::Ordering::Equal => a_key.cmp(&b_key),
other => other,
}
} else {
std::cmp::Ordering::Equal
}
});
if arr.len() > n as usize {
arr.truncate(n as usize);
}
state.insert(key.to_string(), Dynamic::from(arr));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "top");
}
Ok(())
},
);
engine.register_fn(
"track_top",
|_key: &str, _item_key: (), _n: i64| -> Result<(), Box<rhai::EvalAltResult>> { Ok(()) },
);
engine.register_fn(
"track_top",
|key: &str, item_key: &str, n: i64, value: i64| -> Result<(), Box<rhai::EvalAltResult>> {
if n < 1 {
return Err(format!("track_top requires n >= 1, got {}", n).into());
}
track_top_weighted_impl(key, item_key, n, value as f64)
},
);
engine.register_fn(
"track_top",
|key: &str, item_key: &str, n: i64, value: i32| -> Result<(), Box<rhai::EvalAltResult>> {
if n < 1 {
return Err(format!("track_top requires n >= 1, got {}", n).into());
}
track_top_weighted_impl(key, item_key, n, value as f64)
},
);
engine.register_fn(
"track_top",
|key: &str, item_key: &str, n: i64, value: f64| -> Result<(), Box<rhai::EvalAltResult>> {
if n < 1 {
return Err(format!("track_top requires n >= 1, got {}", n).into());
}
track_top_weighted_impl(key, item_key, n, value)
},
);
engine.register_fn(
"track_top",
|key: &str, item_key: &str, n: i64, value: f32| -> Result<(), Box<rhai::EvalAltResult>> {
if n < 1 {
return Err(format!("track_top requires n >= 1, got {}", n).into());
}
track_top_weighted_impl(key, item_key, n, value as f64)
},
);
engine.register_fn(
"track_top",
|_key: &str, _item_key: (), _n: i64, _value: i64| -> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
engine.register_fn(
"track_top",
|_key: &str, _item_key: (), _n: i64, _value: i32| -> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
engine.register_fn(
"track_top",
|_key: &str, _item_key: (), _n: i64, _value: f64| -> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
engine.register_fn(
"track_top",
|_key: &str, _item_key: (), _n: i64, _value: f32| -> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
engine.register_fn(
"track_top",
|_key: &str,
_item_key: &str,
_n: i64,
_value: ()|
-> Result<(), Box<rhai::EvalAltResult>> { Ok(()) },
);
engine.register_fn(
"track_top",
|_key: &str, _item_key: (), _n: i64, _value: ()| -> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
engine.register_fn(
"track_bottom",
|key: &str, item_key: &str, n: i64| -> Result<(), Box<rhai::EvalAltResult>> {
if n < 1 {
return Err(format!("track_bottom requires n >= 1, got {}", n).into());
}
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Array::new()));
if let Ok(mut arr) = current.into_array() {
let mut found_idx = None;
for (idx, elem) in arr.iter().enumerate() {
if let Some(map) = elem.clone().try_cast::<rhai::Map>() {
if let Some(k) = map.get("key") {
if k.clone().into_string().unwrap_or_default() == item_key {
found_idx = Some(idx);
break;
}
}
}
}
if let Some(idx) = found_idx {
if let Some(map) = arr[idx].clone().try_cast::<rhai::Map>() {
let count = map.get("count").cloned().unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
let mut new_map = rhai::Map::new();
new_map.insert("key".into(), Dynamic::from(item_key.to_string()));
new_map.insert("count".into(), Dynamic::from(new_count));
arr[idx] = Dynamic::from(new_map);
}
} else {
let mut new_map = rhai::Map::new();
new_map.insert("key".into(), Dynamic::from(item_key.to_string()));
new_map.insert("count".into(), Dynamic::from(1i64));
arr.push(Dynamic::from(new_map));
}
arr.sort_by(|a, b| {
let a_map = a.clone().try_cast::<rhai::Map>();
let b_map = b.clone().try_cast::<rhai::Map>();
if let (Some(a_m), Some(b_m)) = (a_map, b_map) {
let a_count =
a_m.get("count").and_then(|v| v.as_int().ok()).unwrap_or(0);
let b_count =
b_m.get("count").and_then(|v| v.as_int().ok()).unwrap_or(0);
let a_key = a_m
.get("key")
.and_then(|v| v.clone().into_string().ok())
.unwrap_or_default();
let b_key = b_m
.get("key")
.and_then(|v| v.clone().into_string().ok())
.unwrap_or_default();
match a_count.cmp(&b_count) {
std::cmp::Ordering::Equal => a_key.cmp(&b_key),
other => other,
}
} else {
std::cmp::Ordering::Equal
}
});
if arr.len() > n as usize {
arr.truncate(n as usize);
}
state.insert(key.to_string(), Dynamic::from(arr));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "bottom");
}
Ok(())
},
);
engine.register_fn(
"track_bottom",
|_key: &str, _item_key: (), _n: i64| -> Result<(), Box<rhai::EvalAltResult>> { Ok(()) },
);
engine.register_fn(
"track_bottom",
|key: &str, item_key: &str, n: i64, value: i64| -> Result<(), Box<rhai::EvalAltResult>> {
if n < 1 {
return Err(format!("track_bottom requires n >= 1, got {}", n).into());
}
track_bottom_weighted_impl(key, item_key, n, value as f64)
},
);
engine.register_fn(
"track_bottom",
|key: &str, item_key: &str, n: i64, value: i32| -> Result<(), Box<rhai::EvalAltResult>> {
if n < 1 {
return Err(format!("track_bottom requires n >= 1, got {}", n).into());
}
track_bottom_weighted_impl(key, item_key, n, value as f64)
},
);
engine.register_fn(
"track_bottom",
|key: &str, item_key: &str, n: i64, value: f64| -> Result<(), Box<rhai::EvalAltResult>> {
if n < 1 {
return Err(format!("track_bottom requires n >= 1, got {}", n).into());
}
track_bottom_weighted_impl(key, item_key, n, value)
},
);
engine.register_fn(
"track_bottom",
|key: &str, item_key: &str, n: i64, value: f32| -> Result<(), Box<rhai::EvalAltResult>> {
if n < 1 {
return Err(format!("track_bottom requires n >= 1, got {}", n).into());
}
track_bottom_weighted_impl(key, item_key, n, value as f64)
},
);
engine.register_fn(
"track_bottom",
|_key: &str, _item_key: (), _n: i64, _value: i64| -> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
engine.register_fn(
"track_bottom",
|_key: &str, _item_key: (), _n: i64, _value: i32| -> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
engine.register_fn(
"track_bottom",
|_key: &str, _item_key: (), _n: i64, _value: f64| -> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
engine.register_fn(
"track_bottom",
|_key: &str, _item_key: (), _n: i64, _value: f32| -> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
engine.register_fn(
"track_bottom",
|_key: &str,
_item_key: &str,
_n: i64,
_value: ()|
-> Result<(), Box<rhai::EvalAltResult>> { Ok(()) },
);
engine.register_fn(
"track_bottom",
|_key: &str, _item_key: (), _n: i64, _value: ()| -> Result<(), Box<rhai::EvalAltResult>> {
Ok(())
},
);
}
fn track_top_weighted_impl(
key: &str,
item_key: &str,
n: i64,
value: f64,
) -> Result<(), Box<rhai::EvalAltResult>> {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Array::new()));
if let Ok(mut arr) = current.into_array() {
let mut found_idx = None;
for (idx, elem) in arr.iter().enumerate() {
if let Some(map) = elem.clone().try_cast::<rhai::Map>() {
if let Some(k) = map.get("key") {
if k.clone().into_string().unwrap_or_default() == item_key {
found_idx = Some(idx);
break;
}
}
}
}
if let Some(idx) = found_idx {
if let Some(map) = arr[idx].clone().try_cast::<rhai::Map>() {
let current_val = map
.get("value")
.and_then(|v| v.as_float().ok())
.unwrap_or(f64::NEG_INFINITY);
let new_val = value.max(current_val);
let mut new_map = rhai::Map::new();
new_map.insert("key".into(), Dynamic::from(item_key.to_string()));
new_map.insert("value".into(), Dynamic::from(new_val));
arr[idx] = Dynamic::from(new_map);
}
} else {
let mut new_map = rhai::Map::new();
new_map.insert("key".into(), Dynamic::from(item_key.to_string()));
new_map.insert("value".into(), Dynamic::from(value));
arr.push(Dynamic::from(new_map));
}
arr.sort_by(|a, b| {
let a_map = a.clone().try_cast::<rhai::Map>();
let b_map = b.clone().try_cast::<rhai::Map>();
if let (Some(a_m), Some(b_m)) = (a_map, b_map) {
let a_val = a_m
.get("value")
.and_then(|v| v.as_float().ok())
.unwrap_or(f64::NEG_INFINITY);
let b_val = b_m
.get("value")
.and_then(|v| v.as_float().ok())
.unwrap_or(f64::NEG_INFINITY);
let a_key = a_m
.get("key")
.and_then(|v| v.clone().into_string().ok())
.unwrap_or_default();
let b_key = b_m
.get("key")
.and_then(|v| v.clone().into_string().ok())
.unwrap_or_default();
match b_val
.partial_cmp(&a_val)
.unwrap_or(std::cmp::Ordering::Equal)
{
std::cmp::Ordering::Equal => a_key.cmp(&b_key),
other => other,
}
} else {
std::cmp::Ordering::Equal
}
});
if arr.len() > n as usize {
arr.truncate(n as usize);
}
state.insert(key.to_string(), Dynamic::from(arr));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "top");
}
Ok(())
}
fn track_bottom_weighted_impl(
key: &str,
item_key: &str,
n: i64,
value: f64,
) -> Result<(), Box<rhai::EvalAltResult>> {
let updated = with_user_tracking(|state| {
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Array::new()));
if let Ok(mut arr) = current.into_array() {
let mut found_idx = None;
for (idx, elem) in arr.iter().enumerate() {
if let Some(map) = elem.clone().try_cast::<rhai::Map>() {
if let Some(k) = map.get("key") {
if k.clone().into_string().unwrap_or_default() == item_key {
found_idx = Some(idx);
break;
}
}
}
}
if let Some(idx) = found_idx {
if let Some(map) = arr[idx].clone().try_cast::<rhai::Map>() {
let current_val = map
.get("value")
.and_then(|v| v.as_float().ok())
.unwrap_or(f64::INFINITY);
let new_val = value.min(current_val);
let mut new_map = rhai::Map::new();
new_map.insert("key".into(), Dynamic::from(item_key.to_string()));
new_map.insert("value".into(), Dynamic::from(new_val));
arr[idx] = Dynamic::from(new_map);
}
} else {
let mut new_map = rhai::Map::new();
new_map.insert("key".into(), Dynamic::from(item_key.to_string()));
new_map.insert("value".into(), Dynamic::from(value));
arr.push(Dynamic::from(new_map));
}
arr.sort_by(|a, b| {
let a_map = a.clone().try_cast::<rhai::Map>();
let b_map = b.clone().try_cast::<rhai::Map>();
if let (Some(a_m), Some(b_m)) = (a_map, b_map) {
let a_val = a_m
.get("value")
.and_then(|v| v.as_float().ok())
.unwrap_or(f64::INFINITY);
let b_val = b_m
.get("value")
.and_then(|v| v.as_float().ok())
.unwrap_or(f64::INFINITY);
let a_key = a_m
.get("key")
.and_then(|v| v.clone().into_string().ok())
.unwrap_or_default();
let b_key = b_m
.get("key")
.and_then(|v| v.clone().into_string().ok())
.unwrap_or_default();
match a_val
.partial_cmp(&b_val)
.unwrap_or(std::cmp::Ordering::Equal)
{
std::cmp::Ordering::Equal => a_key.cmp(&b_key),
other => other,
}
} else {
std::cmp::Ordering::Equal
}
});
if arr.len() > n as usize {
arr.truncate(n as usize);
}
state.insert(key.to_string(), Dynamic::from(arr));
true
} else {
false
}
});
if updated {
record_operation_metadata(key, "bottom");
}
Ok(())
}
pub fn merge_thread_tracking_to_context(ctx: &mut crate::pipeline::PipelineContext) {
let snapshot = get_thread_snapshot();
for (key, value) in snapshot.user {
ctx.tracker.insert(key, value);
}
for (key, value) in snapshot.internal {
ctx.internal_tracker.insert(key, value);
}
}
pub fn format_metrics_output(metrics: &HashMap<String, Dynamic>, metrics_level: u8) -> String {
let mut output = String::new();
let mut user_values: Vec<_> = metrics
.iter()
.filter(|(k, _)| !k.starts_with("__op_") && !k.starts_with("__kelora_stats_"))
.collect();
if user_values.is_empty() {
return "No metrics tracked".to_string();
}
user_values.sort_by_key(|(k, _)| k.as_str());
for (key, value) in user_values {
if value.is::<rhai::Array>() {
if let Ok(arr) = value.clone().into_array() {
let len = arr.len();
let is_top_bottom = if !arr.is_empty() {
if let Some(first_map) = arr[0].clone().try_cast::<rhai::Map>() {
first_map.contains_key("key")
&& (first_map.contains_key("count") || first_map.contains_key("value"))
} else {
false
}
} else {
false
};
if is_top_bottom {
let field_name = if let Some(first_map) = arr[0].clone().try_cast::<rhai::Map>()
{
if first_map.contains_key("count") {
"count"
} else {
"value"
}
} else {
"count"
};
if metrics_level >= 2 {
output.push_str(&format!("{:<12} ({} items):\n", key, len));
for (idx, item) in arr.iter().enumerate() {
if let Some(map) = item.clone().try_cast::<rhai::Map>() {
if let (Some(k), Some(v)) = (map.get("key"), map.get(field_name)) {
let key_str = k.clone().into_string().unwrap_or_default();
if field_name == "count" {
let count = v.as_int().unwrap_or(0);
output.push_str(&format!(
" #{:<2} {:<30} {}\n",
idx + 1,
key_str,
count
));
} else {
let val = v.as_float().unwrap_or(0.0);
output.push_str(&format!(
" #{:<2} {:<30} {:.2}\n",
idx + 1,
key_str,
val
));
}
}
}
}
} else if len <= 10 {
output.push_str(&format!("{:<12} ({} items):\n", key, len));
for (idx, item) in arr.iter().enumerate() {
if let Some(map) = item.clone().try_cast::<rhai::Map>() {
if let (Some(k), Some(v)) = (map.get("key"), map.get(field_name)) {
let key_str = k.clone().into_string().unwrap_or_default();
if field_name == "count" {
let count = v.as_int().unwrap_or(0);
output.push_str(&format!(
" #{:<2} {:<30} {}\n",
idx + 1,
key_str,
count
));
} else {
let val = v.as_float().unwrap_or(0.0);
output.push_str(&format!(
" #{:<2} {:<30} {:.2}\n",
idx + 1,
key_str,
val
));
}
}
}
}
} else {
output.push_str(&format!("{:<12} ({} items):\n", key, len));
for (idx, item) in arr.iter().take(5).enumerate() {
if let Some(map) = item.clone().try_cast::<rhai::Map>() {
if let (Some(k), Some(v)) = (map.get("key"), map.get(field_name)) {
let key_str = k.clone().into_string().unwrap_or_default();
if field_name == "count" {
let count = v.as_int().unwrap_or(0);
output.push_str(&format!(
" #{:<2} {:<30} {}\n",
idx + 1,
key_str,
count
));
} else {
let val = v.as_float().unwrap_or(0.0);
output.push_str(&format!(
" #{:<2} {:<30} {:.2}\n",
idx + 1,
key_str,
val
));
}
}
}
}
output.push_str(&format!(
" [+{} more. Use --metrics=full or --metrics-file for full list]\n",
len - 5
));
}
} else {
if metrics_level >= 2 {
output.push_str(&format!("{:<12} ({} unique):\n", key, len));
for item in arr.iter() {
output.push_str(&format!(" {}\n", item));
}
} else if len <= 10 {
output.push_str(&format!("{:<12} = {}\n", key, value));
} else {
output.push_str(&format!("{:<12} ({} unique):\n", key, len));
for item in arr.iter().take(5) {
output.push_str(&format!(" {}\n", item));
}
output.push_str(&format!(
" [+{} more. Use --metrics=full or --metrics-file for full list]\n",
len - 5
));
}
}
continue;
}
}
if let Some(map) = value.clone().try_cast::<rhai::Map>() {
if map.contains_key("sum") && map.contains_key("count") {
let sum = map
.get("sum")
.and_then(|v| {
if v.is_float() {
v.as_float().ok()
} else if v.is_int() {
v.as_int().ok().map(|i| i as f64)
} else {
None
}
})
.unwrap_or(0.0);
let count = map.get("count").and_then(|v| v.as_int().ok()).unwrap_or(0);
let avg = if count > 0 { sum / count as f64 } else { 0.0 };
output.push_str(&format!("{:<12} = {}\n", key, avg));
continue;
}
}
if let Ok(blob) = value.clone().into_blob() {
if is_hll_blob(&blob) {
if let Some(hll) = deserialize_hll(&blob) {
let cardinality = hll.len();
output.push_str(&format!("{:<12} ≈ {}\n", key, cardinality));
continue;
}
}
if let Some(digest) = deserialize_tdigest(&blob) {
if let Some(p_pos) = key.rfind("_p") {
if let Ok(percentile) = key[p_pos + 2..].parse::<f64>() {
let quantile = percentile / 100.0;
let value = digest.estimate_quantile(quantile);
output.push_str(&format!("{:<12} = {:.2}\n", key, value));
continue;
}
}
}
}
if value.is_int() {
output.push_str(&format!("{:<12} = {}\n", key, value.as_int().unwrap_or(0)));
} else if value.is_float() {
output.push_str(&format!(
"{:<12} = {}\n",
key,
value.as_float().unwrap_or(0.0)
));
} else {
output.push_str(&format!("{:<12} = {}\n", key, value));
}
}
output.trim_end().to_string()
}
fn dynamic_to_json(value: Dynamic) -> serde_json::Value {
if value.is_unit() {
return serde_json::Value::Null;
}
if value.is::<rhai::Array>() {
if let Ok(array) = value.clone().into_array() {
let json_array = array.into_iter().map(dynamic_to_json).collect();
return serde_json::Value::Array(json_array);
}
}
if value.is::<rhai::Map>() {
if let Some(map) = value.clone().try_cast::<rhai::Map>() {
let mut json_map = serde_json::Map::new();
for (k, v) in map {
json_map.insert(k.into(), dynamic_to_json(v));
}
return serde_json::Value::Object(json_map);
}
}
if value.is_int() {
return serde_json::Value::Number(serde_json::Number::from(
value.as_int().unwrap_or_default(),
));
}
if value.is_float() {
if let Some(num) = serde_json::Number::from_f64(value.as_float().unwrap_or_default()) {
return serde_json::Value::Number(num);
}
}
if let Some(boolean) = value.clone().try_cast::<bool>() {
return serde_json::Value::Bool(boolean);
}
if let Some(string) = value.clone().try_cast::<rhai::ImmutableString>() {
return serde_json::Value::String(string.into());
}
serde_json::Value::String(value.to_string())
}
pub fn format_metrics_json(
metrics: &HashMap<String, Dynamic>,
) -> Result<String, serde_json::Error> {
let mut json_obj = serde_json::Map::new();
for (key, value) in metrics.iter() {
if key.starts_with("__op_")
|| key.starts_with("__kelora_stats_")
|| key.starts_with("__kelora_error_")
{
continue;
}
if let Some(map) = value.clone().try_cast::<rhai::Map>() {
if map.contains_key("sum") && map.contains_key("count") {
let sum = map
.get("sum")
.and_then(|v| {
if v.is_float() {
v.as_float().ok()
} else if v.is_int() {
v.as_int().ok().map(|i| i as f64)
} else {
None
}
})
.unwrap_or(0.0);
let count = map.get("count").and_then(|v| v.as_int().ok()).unwrap_or(0);
let avg = if count > 0 { sum / count as f64 } else { 0.0 };
if let Some(num) = serde_json::Number::from_f64(avg) {
json_obj.insert(key.clone(), serde_json::Value::Number(num));
} else {
json_obj.insert(key.clone(), serde_json::Value::Null);
}
continue;
}
}
if let Ok(blob) = value.clone().into_blob() {
if is_hll_blob(&blob) {
if let Some(hll) = deserialize_hll(&blob) {
let cardinality = hll.len() as i64;
json_obj.insert(
key.clone(),
serde_json::Value::Number(serde_json::Number::from(cardinality)),
);
continue;
}
}
if let Some(digest) = deserialize_tdigest(&blob) {
if let Some(p_pos) = key.rfind("_p") {
if let Ok(percentile) = key[p_pos + 2..].parse::<f64>() {
let quantile = percentile / 100.0;
let percentile_value = digest.estimate_quantile(quantile);
if let Some(num) = serde_json::Number::from_f64(percentile_value) {
json_obj.insert(key.clone(), serde_json::Value::Number(num));
} else {
json_obj.insert(key.clone(), serde_json::Value::Null);
}
continue;
}
}
}
}
json_obj.insert(key.clone(), dynamic_to_json(value.clone()));
}
serde_json::to_string_pretty(&json_obj)
}
#[allow(dead_code)] pub fn extract_error_summary(metrics: &HashMap<String, Dynamic>) -> Option<String> {
let mut has_errors = false;
let mut summary = serde_json::Map::new();
let mut error_types: std::collections::HashSet<String> = std::collections::HashSet::new();
for key in metrics.keys() {
if let Some(suffix) = key.strip_prefix("__kelora_error_count_") {
error_types.insert(suffix.to_string());
}
}
for error_type in error_types {
let count_key = format!("__kelora_error_count_{}", error_type);
let examples_key = format!("__kelora_error_examples_{}", error_type);
if let Some(count_value) = metrics.get(&count_key) {
let count = count_value.as_int().unwrap_or(0);
if count > 0 {
has_errors = true;
let mut error_obj = serde_json::Map::new();
error_obj.insert(
"count".to_string(),
serde_json::Value::Number(serde_json::Number::from(count)),
);
if let Some(examples_value) = metrics.get(&examples_key) {
if let Ok(examples_array) = examples_value.clone().into_array() {
let examples: Vec<serde_json::Value> = examples_array
.iter()
.map(|v| {
serde_json::Value::String(
v.clone().into_string().unwrap_or_default(),
)
})
.collect();
error_obj
.insert("examples".to_string(), serde_json::Value::Array(examples));
}
}
summary.insert(error_type, serde_json::Value::Object(error_obj));
}
}
}
if has_errors {
Some(
serde_json::to_string_pretty(&summary)
.unwrap_or_else(|_| "Error serializing summary".to_string()),
)
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use rhai::Dynamic;
fn clear_tracking_state() {
THREAD_TRACKING_STATE.with(|state| {
let mut snapshot = state.borrow_mut();
snapshot.user.clear();
snapshot.internal.clear();
});
}
#[test]
fn test_merge_numeric_integers() {
let result = merge_numeric(Some(Dynamic::from(5i64)), Dynamic::from(3i64));
assert_eq!(result.as_int().unwrap(), 8);
}
#[test]
fn test_merge_numeric_floats() {
let result = merge_numeric(Some(Dynamic::from(5.5f64)), Dynamic::from(3.2f64));
let value = result.as_float().unwrap();
assert!((value - 8.7).abs() < 0.001);
}
#[test]
fn test_merge_numeric_mixed_int_and_float() {
let result = merge_numeric(Some(Dynamic::from(5i64)), Dynamic::from(3.5f64));
let value = result.as_float().unwrap();
assert!((value - 8.5).abs() < 0.001);
}
#[test]
fn test_merge_numeric_no_existing() {
let result = merge_numeric(None, Dynamic::from(42i64));
assert_eq!(result.as_int().unwrap(), 42);
}
#[test]
fn test_get_set_thread_tracking_state() {
clear_tracking_state();
let mut metrics = HashMap::new();
metrics.insert("test_key".to_string(), Dynamic::from(123i64));
set_thread_tracking_state(&metrics);
let retrieved = get_thread_tracking_state();
assert_eq!(retrieved.len(), 1);
assert_eq!(retrieved.get("test_key").unwrap().as_int().unwrap(), 123);
clear_tracking_state();
}
#[test]
fn test_get_set_thread_internal_state() {
clear_tracking_state();
let mut internal = HashMap::new();
internal.insert("internal_key".to_string(), Dynamic::from(456i64));
set_thread_internal_state(&internal);
let retrieved = get_thread_internal_state();
assert_eq!(retrieved.len(), 1);
assert_eq!(
retrieved.get("internal_key").unwrap().as_int().unwrap(),
456
);
clear_tracking_state();
}
#[test]
fn test_with_user_tracking() {
clear_tracking_state();
with_user_tracking(|state| {
state.insert("key1".to_string(), Dynamic::from(100i64));
state.insert("key2".to_string(), Dynamic::from(200i64));
});
let retrieved = get_thread_tracking_state();
assert_eq!(retrieved.len(), 2);
assert_eq!(retrieved.get("key1").unwrap().as_int().unwrap(), 100);
assert_eq!(retrieved.get("key2").unwrap().as_int().unwrap(), 200);
clear_tracking_state();
}
#[test]
fn test_with_internal_tracking() {
clear_tracking_state();
with_internal_tracking(|state| {
state.insert("__internal1".to_string(), Dynamic::from(999i64));
});
let retrieved = get_thread_internal_state();
assert_eq!(retrieved.len(), 1);
assert_eq!(retrieved.get("__internal1").unwrap().as_int().unwrap(), 999);
clear_tracking_state();
}
#[test]
fn test_record_operation_metadata() {
clear_tracking_state();
record_operation_metadata("test_key", "count");
let internal = get_thread_internal_state();
assert!(internal.contains_key("__op_test_key"));
assert_eq!(
internal
.get("__op_test_key")
.unwrap()
.clone()
.into_string()
.unwrap(),
"count"
);
clear_tracking_state();
}
#[test]
fn test_tracking_snapshot_from_parts() {
let mut user = HashMap::new();
user.insert("user_key".to_string(), Dynamic::from(1i64));
let mut internal = HashMap::new();
internal.insert("internal_key".to_string(), Dynamic::from(2i64));
let snapshot = TrackingSnapshot::from_parts(user.clone(), internal.clone());
assert_eq!(snapshot.user.len(), 1);
assert_eq!(snapshot.internal.len(), 1);
assert_eq!(snapshot.user.get("user_key").unwrap().as_int().unwrap(), 1);
assert_eq!(
snapshot
.internal
.get("internal_key")
.unwrap()
.as_int()
.unwrap(),
2
);
}
#[test]
fn test_get_thread_snapshot() {
clear_tracking_state();
with_user_tracking(|state| {
state.insert("user_data".to_string(), Dynamic::from(111i64));
});
with_internal_tracking(|state| {
state.insert("internal_data".to_string(), Dynamic::from(222i64));
});
let snapshot = get_thread_snapshot();
assert_eq!(snapshot.user.len(), 1);
assert_eq!(snapshot.internal.len(), 1);
assert_eq!(
snapshot.user.get("user_data").unwrap().as_int().unwrap(),
111
);
assert_eq!(
snapshot
.internal
.get("internal_data")
.unwrap()
.as_int()
.unwrap(),
222
);
clear_tracking_state();
}
#[test]
fn test_has_errors_in_tracking_no_errors() {
let snapshot = TrackingSnapshot::default();
assert!(!has_errors_in_tracking(&snapshot));
}
#[test]
fn test_has_errors_in_tracking_with_errors() {
let mut internal = HashMap::new();
internal.insert(
"__kelora_error_count_parse".to_string(),
Dynamic::from(5i64),
);
let snapshot = TrackingSnapshot::from_parts(HashMap::new(), internal);
assert!(has_errors_in_tracking(&snapshot));
}
#[test]
fn test_has_errors_in_tracking_zero_count() {
let mut internal = HashMap::new();
internal.insert(
"__kelora_error_count_parse".to_string(),
Dynamic::from(0i64),
);
let snapshot = TrackingSnapshot::from_parts(HashMap::new(), internal);
assert!(!has_errors_in_tracking(&snapshot));
}
#[test]
fn test_format_metrics_output_empty() {
let metrics = HashMap::new();
let output = format_metrics_output(&metrics, 1);
assert_eq!(output, "No metrics tracked");
}
#[test]
fn test_format_metrics_output_simple_values() {
let mut metrics = HashMap::new();
metrics.insert("count".to_string(), Dynamic::from(42i64));
metrics.insert("sum".to_string(), Dynamic::from(2.5f64));
let output = format_metrics_output(&metrics, 1);
assert!(output.contains("count"));
assert!(output.contains("42"));
assert!(output.contains("sum"));
assert!(output.contains("2.5"));
}
#[test]
fn test_format_metrics_output_filters_internal_keys() {
let mut metrics = HashMap::new();
metrics.insert("user_metric".to_string(), Dynamic::from(100i64));
metrics.insert("__op_user_metric".to_string(), Dynamic::from("count"));
metrics.insert("__kelora_stats_lines".to_string(), Dynamic::from(50i64));
let output = format_metrics_output(&metrics, 1);
assert!(output.contains("user_metric"));
assert!(!output.contains("__op_"));
assert!(!output.contains("__kelora_stats_"));
}
#[test]
fn test_format_metrics_output_small_array() {
let mut metrics = HashMap::new();
let arr = vec![
Dynamic::from("val1"),
Dynamic::from("val2"),
Dynamic::from("val3"),
];
metrics.insert("unique_vals".to_string(), Dynamic::from(arr));
let output = format_metrics_output(&metrics, 1);
assert!(output.contains("unique_vals"));
assert!(output.contains("val1"));
assert!(output.contains("val2"));
assert!(output.contains("val3"));
}
#[test]
fn test_format_metrics_output_large_array_abbreviated() {
let mut metrics = HashMap::new();
let mut arr = Vec::new();
for i in 0..20 {
arr.push(Dynamic::from(format!("item{}", i)));
}
metrics.insert("large_array".to_string(), Dynamic::from(arr));
let output = format_metrics_output(&metrics, 1); assert!(output.contains("large_array"));
assert!(output.contains("20 unique"));
assert!(output.contains("item0"));
assert!(output.contains("item4"));
assert!(output.contains("[+15 more"));
}
#[test]
fn test_format_metrics_output_large_array_full() {
let mut metrics = HashMap::new();
let mut arr = Vec::new();
for i in 0..20 {
arr.push(Dynamic::from(format!("item{}", i)));
}
metrics.insert("large_array".to_string(), Dynamic::from(arr));
let output = format_metrics_output(&metrics, 2); assert!(output.contains("large_array"));
assert!(output.contains("20 unique"));
assert!(output.contains("item0"));
assert!(output.contains("item19"));
assert!(!output.contains("[+15 more")); }
#[test]
fn test_format_metrics_json_simple() {
let mut metrics = HashMap::new();
metrics.insert("count".to_string(), Dynamic::from(42i64));
let json = format_metrics_json(&metrics).unwrap();
assert!(json.contains("\"count\""));
assert!(json.contains("42"));
}
#[test]
fn test_format_metrics_json_filters_internal() {
let mut metrics = HashMap::new();
metrics.insert("user_metric".to_string(), Dynamic::from(100i64));
metrics.insert("__op_user_metric".to_string(), Dynamic::from("count"));
metrics.insert("__kelora_stats_lines".to_string(), Dynamic::from(50i64));
metrics.insert(
"__kelora_error_count_parse".to_string(),
Dynamic::from(5i64),
);
let json = format_metrics_json(&metrics).unwrap();
assert!(json.contains("\"user_metric\""));
assert!(!json.contains("\"__op_"));
assert!(!json.contains("\"__kelora_stats_"));
assert!(!json.contains("\"__kelora_error_"));
}
#[test]
fn test_format_metrics_json_array() {
let mut metrics = HashMap::new();
let arr = vec![
Dynamic::from(1i64),
Dynamic::from(2i64),
Dynamic::from(3i64),
];
metrics.insert("numbers".to_string(), Dynamic::from(arr));
let json = format_metrics_json(&metrics).unwrap();
assert!(json.contains("\"numbers\""));
assert!(json.contains("["));
assert!(json.contains("1"));
assert!(json.contains("2"));
assert!(json.contains("3"));
}
#[test]
fn test_format_metrics_json_map() {
let mut metrics = HashMap::new();
let mut map = rhai::Map::new();
map.insert("key1".into(), Dynamic::from(10i64));
map.insert("key2".into(), Dynamic::from(20i64));
metrics.insert("buckets".to_string(), Dynamic::from(map));
let json = format_metrics_json(&metrics).unwrap();
assert!(json.contains("\"buckets\""));
assert!(json.contains("\"key1\""));
assert!(json.contains("10"));
assert!(json.contains("\"key2\""));
assert!(json.contains("20"));
}
#[test]
fn test_dynamic_to_json_null() {
let json = dynamic_to_json(Dynamic::UNIT);
assert!(json.is_null());
}
#[test]
fn test_dynamic_to_json_integer() {
let json = dynamic_to_json(Dynamic::from(42i64));
assert_eq!(json.as_i64().unwrap(), 42);
}
#[test]
fn test_dynamic_to_json_float() {
let json = dynamic_to_json(Dynamic::from(2.5f64));
let val = json.as_f64().unwrap();
assert!((val - 2.5).abs() < 0.001);
}
#[test]
fn test_dynamic_to_json_string() {
let json = dynamic_to_json(Dynamic::from("hello"));
assert_eq!(json.as_str().unwrap(), "hello");
}
#[test]
fn test_dynamic_to_json_bool() {
let json = dynamic_to_json(Dynamic::from(true));
assert!(json.as_bool().unwrap());
}
#[test]
fn test_dynamic_to_json_array() {
let arr = vec![
Dynamic::from(1i64),
Dynamic::from(2i64),
Dynamic::from(3i64),
];
let json = dynamic_to_json(Dynamic::from(arr));
assert!(json.is_array());
let array = json.as_array().unwrap();
assert_eq!(array.len(), 3);
assert_eq!(array[0].as_i64().unwrap(), 1);
assert_eq!(array[1].as_i64().unwrap(), 2);
assert_eq!(array[2].as_i64().unwrap(), 3);
}
#[test]
fn test_dynamic_to_json_map() {
let mut map = rhai::Map::new();
map.insert("a".into(), Dynamic::from(100i64));
map.insert("b".into(), Dynamic::from(200i64));
let json = dynamic_to_json(Dynamic::from(map));
assert!(json.is_object());
let obj = json.as_object().unwrap();
assert_eq!(obj.get("a").unwrap().as_i64().unwrap(), 100);
assert_eq!(obj.get("b").unwrap().as_i64().unwrap(), 200);
}
#[test]
fn test_format_error_location_single_file() {
let input_files = vec!["test.log".to_string()];
let location = format_error_location(Some(42), Some("test.log"), &input_files);
assert_eq!(location, "line 42");
}
#[test]
fn test_format_error_location_stdin() {
let input_files: Vec<String> = vec![];
let location = format_error_location(Some(10), None, &input_files);
assert_eq!(location, "line 10");
}
#[test]
fn test_format_error_location_multiple_files_no_conflict() {
let input_files = vec!["file1.log".to_string(), "file2.log".to_string()];
let location = format_error_location(Some(100), Some("file1.log"), &input_files);
assert_eq!(location, "file1.log:100");
}
#[test]
fn test_format_error_location_multiple_files_with_conflict() {
let input_files = vec![
"/path/to/file.log".to_string(),
"/other/path/file.log".to_string(),
];
let location = format_error_location(Some(50), Some("/path/to/file.log"), &input_files);
assert_eq!(location, "/path/to/file.log:50");
}
#[test]
fn test_format_error_location_no_line_number() {
let input_files = vec!["test.log".to_string()];
let location = format_error_location(None, Some("test.log"), &input_files);
assert_eq!(location, "unknown");
}
#[test]
fn test_extract_error_summary_no_errors() {
let metrics = HashMap::new();
let summary = extract_error_summary(&metrics);
assert!(summary.is_none());
}
#[test]
fn test_extract_error_summary_with_errors() {
let mut metrics = HashMap::new();
metrics.insert(
"__kelora_error_count_parse".to_string(),
Dynamic::from(5i64),
);
let arr = vec![Dynamic::from("example error 1")];
metrics.insert(
"__kelora_error_examples_parse".to_string(),
Dynamic::from(arr),
);
let summary = extract_error_summary(&metrics);
assert!(summary.is_some());
let text = summary.unwrap();
assert!(text.contains("parse"));
assert!(text.contains("\"count\": 5"));
}
#[test]
fn test_extract_error_summary_zero_errors() {
let mut metrics = HashMap::new();
metrics.insert(
"__kelora_error_count_parse".to_string(),
Dynamic::from(0i64),
);
let summary = extract_error_summary(&metrics);
assert!(summary.is_none());
}
#[test]
fn test_extract_error_summary_from_tracking_no_errors() {
let snapshot = TrackingSnapshot::default();
let summary = extract_error_summary_from_tracking(&snapshot, 0, None, None);
assert!(summary.is_none());
}
#[test]
fn test_extract_error_summary_from_tracking_with_errors() {
let mut internal = HashMap::new();
internal.insert(
"__kelora_error_count_parse".to_string(),
Dynamic::from(3i64),
);
let mut sample_obj = rhai::Map::new();
sample_obj.insert("error_type".into(), Dynamic::from("parse"));
sample_obj.insert("line_num".into(), Dynamic::from(42i64));
sample_obj.insert("message".into(), Dynamic::from("Test error"));
sample_obj.insert("filename".into(), Dynamic::from("test.log"));
let samples = vec![Dynamic::from(sample_obj)];
internal.insert(
"__kelora_error_samples_parse".to_string(),
Dynamic::from(samples),
);
let snapshot = TrackingSnapshot::from_parts(HashMap::new(), internal);
let summary = extract_error_summary_from_tracking(&snapshot, 0, None, None);
assert!(summary.is_some());
let text = summary.unwrap();
assert!(text.contains("Parse errors: 3 total"));
assert!(text.contains("Test error"));
}
#[test]
fn test_extract_error_summary_adds_yearless_warning() {
let mut internal = HashMap::new();
internal.insert(
"__kelora_error_count_parse".to_string(),
Dynamic::from(2i64),
);
let snapshot = TrackingSnapshot::from_parts(HashMap::new(), internal);
let stats = ProcessingStats {
yearless_timestamps: 5,
..Default::default()
};
let summary =
extract_error_summary_from_tracking(&snapshot, 0, Some(&stats), None).unwrap();
assert!(summary.contains("Year-less timestamp format detected"));
assert!(summary.contains("5 parse"));
}
#[test]
fn test_merge_numeric_edge_case_zero_plus_zero() {
let result = merge_numeric(Some(Dynamic::from(0i64)), Dynamic::from(0i64));
assert_eq!(result.as_int().unwrap(), 0);
}
#[test]
fn test_merge_numeric_edge_case_negative_numbers() {
let result = merge_numeric(Some(Dynamic::from(-5i64)), Dynamic::from(-3i64));
assert_eq!(result.as_int().unwrap(), -8);
}
#[test]
fn test_merge_numeric_edge_case_large_integers() {
let result = merge_numeric(
Some(Dynamic::from(1_000_000_000i64)),
Dynamic::from(2_000_000_000i64),
);
assert_eq!(result.as_int().unwrap(), 3_000_000_000i64);
}
#[test]
fn test_thread_tracking_isolation() {
clear_tracking_state();
with_user_tracking(|state| {
state.insert("test".to_string(), Dynamic::from(1i64));
});
let state1 = get_thread_tracking_state();
assert_eq!(state1.get("test").unwrap().as_int().unwrap(), 1);
clear_tracking_state();
let state2 = get_thread_tracking_state();
assert!(state2.is_empty());
}
#[test]
fn test_track_top_count_mode() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_top("test", "apple", 3)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("test", "banana", 3)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("test", "apple", 3)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("test", "cherry", 3)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("test", "apple", 3)"#)
.unwrap();
let state = get_thread_tracking_state();
let result = state.get("test").unwrap().clone().into_array().unwrap();
assert_eq!(result.len(), 3);
let first = result[0].clone().try_cast::<rhai::Map>().unwrap();
assert_eq!(
first.get("key").unwrap().clone().into_string().unwrap(),
"apple"
);
assert_eq!(first.get("count").unwrap().as_int().unwrap(), 3);
let second = result[1].clone().try_cast::<rhai::Map>().unwrap();
assert_eq!(second.get("count").unwrap().as_int().unwrap(), 1);
clear_tracking_state();
}
#[test]
fn test_track_top_n_limit() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_top("test", "item1", 2)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("test", "item2", 2)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("test", "item2", 2)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("test", "item3", 2)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("test", "item4", 2)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("test", "item5", 2)"#)
.unwrap();
let state = get_thread_tracking_state();
let result = state.get("test").unwrap().clone().into_array().unwrap();
assert_eq!(result.len(), 2);
let first = result[0].clone().try_cast::<rhai::Map>().unwrap();
assert_eq!(
first.get("key").unwrap().clone().into_string().unwrap(),
"item2"
);
assert_eq!(first.get("count").unwrap().as_int().unwrap(), 2);
clear_tracking_state();
}
#[test]
fn test_track_top_weighted_mode() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_top("slow", "/api/users", 2, 150)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("slow", "/api/products", 2, 50)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("slow", "/api/users", 2, 200)"#)
.unwrap(); engine
.eval::<()>(r#"track_top("slow", "/api/orders", 2, 75)"#)
.unwrap();
let state = get_thread_tracking_state();
let result = state.get("slow").unwrap().clone().into_array().unwrap();
assert_eq!(result.len(), 2);
let first = result[0].clone().try_cast::<rhai::Map>().unwrap();
assert_eq!(
first.get("key").unwrap().clone().into_string().unwrap(),
"/api/users"
);
assert_eq!(first.get("value").unwrap().as_float().unwrap(), 200.0);
let second = result[1].clone().try_cast::<rhai::Map>().unwrap();
assert_eq!(
second.get("key").unwrap().clone().into_string().unwrap(),
"/api/orders"
);
assert_eq!(second.get("value").unwrap().as_float().unwrap(), 75.0);
clear_tracking_state();
}
#[test]
fn test_track_bottom_count_mode() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_bottom("test", "apple", 3)"#)
.unwrap();
engine
.eval::<()>(r#"track_bottom("test", "apple", 3)"#)
.unwrap();
engine
.eval::<()>(r#"track_bottom("test", "apple", 3)"#)
.unwrap();
engine
.eval::<()>(r#"track_bottom("test", "banana", 3)"#)
.unwrap();
engine
.eval::<()>(r#"track_bottom("test", "banana", 3)"#)
.unwrap();
engine
.eval::<()>(r#"track_bottom("test", "cherry", 3)"#)
.unwrap();
engine
.eval::<()>(r#"track_bottom("test", "date", 3)"#)
.unwrap();
let state = get_thread_tracking_state();
let result = state.get("test").unwrap().clone().into_array().unwrap();
assert_eq!(result.len(), 3);
let first = result[0].clone().try_cast::<rhai::Map>().unwrap();
assert_eq!(first.get("count").unwrap().as_int().unwrap(), 1);
assert_eq!(
first.get("key").unwrap().clone().into_string().unwrap(),
"cherry"
);
let second = result[1].clone().try_cast::<rhai::Map>().unwrap();
assert_eq!(second.get("count").unwrap().as_int().unwrap(), 1);
assert_eq!(
second.get("key").unwrap().clone().into_string().unwrap(),
"date"
);
let third = result[2].clone().try_cast::<rhai::Map>().unwrap();
assert_eq!(third.get("count").unwrap().as_int().unwrap(), 2);
assert_eq!(
third.get("key").unwrap().clone().into_string().unwrap(),
"banana"
);
clear_tracking_state();
}
#[test]
fn test_track_bottom_weighted_mode() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_bottom("fast", "/api/users", 2, 150.5)"#)
.unwrap();
engine
.eval::<()>(r#"track_bottom("fast", "/api/products", 2, 30.0)"#)
.unwrap();
engine
.eval::<()>(r#"track_bottom("fast", "/api/users", 2, 100.0)"#)
.unwrap(); engine
.eval::<()>(r#"track_bottom("fast", "/api/orders", 2, 75.0)"#)
.unwrap();
let state = get_thread_tracking_state();
let result = state.get("fast").unwrap().clone().into_array().unwrap();
assert_eq!(result.len(), 2);
let first = result[0].clone().try_cast::<rhai::Map>().unwrap();
assert_eq!(
first.get("key").unwrap().clone().into_string().unwrap(),
"/api/products"
);
assert_eq!(first.get("value").unwrap().as_float().unwrap(), 30.0);
let second = result[1].clone().try_cast::<rhai::Map>().unwrap();
assert_eq!(
second.get("key").unwrap().clone().into_string().unwrap(),
"/api/orders"
);
assert_eq!(second.get("value").unwrap().as_float().unwrap(), 75.0);
clear_tracking_state();
}
#[test]
fn test_track_bottom_unit_item() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine.eval::<()>(r#"track_bottom("test", (), 3)"#).unwrap();
engine
.eval::<()>(r#"track_bottom("test", (), 3, 10)"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(
!state.contains_key("test")
|| state
.get("test")
.unwrap()
.clone()
.into_array()
.unwrap()
.is_empty()
);
clear_tracking_state();
}
#[test]
fn test_track_top_invalid_n() {
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
let result = engine.eval::<()>(r#"track_top("test", "item", 0)"#);
assert!(result.is_err());
let result = engine.eval::<()>(r#"track_top("test", "item", -1)"#);
assert!(result.is_err());
}
#[test]
fn test_track_top_unit_value() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_top("test", "apple", 3, ())"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(
!state.contains_key("test")
|| state
.get("test")
.unwrap()
.clone()
.into_array()
.unwrap()
.is_empty()
);
clear_tracking_state();
}
#[test]
fn test_track_top_unit_item() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine.eval::<()>(r#"track_top("test", (), 3)"#).unwrap();
engine
.eval::<()>(r#"track_top("test", (), 3, 10)"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(
!state.contains_key("test")
|| state
.get("test")
.unwrap()
.clone()
.into_array()
.unwrap()
.is_empty()
);
clear_tracking_state();
}
#[test]
fn test_track_top_stable_sort() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_top("test", "zebra", 5)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("test", "apple", 5)"#)
.unwrap();
engine
.eval::<()>(r#"track_top("test", "mango", 5)"#)
.unwrap();
let state = get_thread_tracking_state();
let result = state.get("test").unwrap().clone().into_array().unwrap();
let first = result[0].clone().try_cast::<rhai::Map>().unwrap();
let second = result[1].clone().try_cast::<rhai::Map>().unwrap();
let third = result[2].clone().try_cast::<rhai::Map>().unwrap();
assert_eq!(
first.get("key").unwrap().clone().into_string().unwrap(),
"apple"
);
assert_eq!(
second.get("key").unwrap().clone().into_string().unwrap(),
"mango"
);
assert_eq!(
third.get("key").unwrap().clone().into_string().unwrap(),
"zebra"
);
clear_tracking_state();
}
#[test]
fn test_track_percentiles_basic() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_percentiles("latency", 100, [0.50, 0.95, 0.99])"#)
.unwrap();
engine
.eval::<()>(r#"track_percentiles("latency", 200, [0.50, 0.95, 0.99])"#)
.unwrap();
engine
.eval::<()>(r#"track_percentiles("latency", 150, [0.50, 0.95, 0.99])"#)
.unwrap();
engine
.eval::<()>(r#"track_percentiles("latency", 300, [0.50, 0.95, 0.99])"#)
.unwrap();
engine
.eval::<()>(r#"track_percentiles("latency", 250, [0.50, 0.95, 0.99])"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(state.contains_key("latency_p50"));
assert!(state.contains_key("latency_p95"));
assert!(state.contains_key("latency_p99"));
assert!(state.get("latency_p50").unwrap().is_blob());
assert!(state.get("latency_p95").unwrap().is_blob());
assert!(state.get("latency_p99").unwrap().is_blob());
clear_tracking_state();
}
#[test]
fn test_track_percentiles_single() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_percentiles("response_time", 123.45, [0.95])"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(state.contains_key("response_time_p95"));
assert!(state.get("response_time_p95").unwrap().is_blob());
clear_tracking_state();
}
#[test]
fn test_track_percentiles_dedup() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_percentiles("test", 100, [0.95, 0.95, 0.99, 0.95])"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(state.contains_key("test_p95"));
assert!(state.contains_key("test_p99"));
clear_tracking_state();
}
#[test]
fn test_track_percentiles_invalid_percentile() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
let result = engine.eval::<()>(r#"track_percentiles("test", 100, [1.01])"#);
assert!(result.is_err());
let result = engine.eval::<()>(r#"track_percentiles("test", 100, [-0.5])"#);
assert!(result.is_err());
clear_tracking_state();
}
#[test]
fn test_track_percentiles_empty_array() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
let result = engine.eval::<()>(r#"track_percentiles("test", 100, [])"#);
assert!(result.is_err());
clear_tracking_state();
}
#[test]
fn test_track_percentiles_unit_value() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_percentiles("test", (), [0.95])"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(!state.contains_key("test_p95"));
clear_tracking_state();
}
#[test]
fn test_track_percentiles_default() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_percentiles("latency", 100)"#)
.unwrap();
engine
.eval::<()>(r#"track_percentiles("latency", 200)"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(state.contains_key("latency_p50"));
assert!(state.contains_key("latency_p95"));
assert!(state.contains_key("latency_p99"));
clear_tracking_state();
}
#[test]
fn test_track_percentiles_decimal_suffix() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_percentiles("latency", 100, [0.999, 0.9999])"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(state.contains_key("latency_p99.9"));
assert!(state.contains_key("latency_p99.99"));
clear_tracking_state();
}
#[test]
fn test_track_percentiles_unit_value_default() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_percentiles("test", ())"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(!state.contains_key("test_p50"));
assert!(!state.contains_key("test_p95"));
assert!(!state.contains_key("test_p99"));
clear_tracking_state();
}
#[test]
fn test_track_stats_basic() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_stats("response_time", 100)"#)
.unwrap();
engine
.eval::<()>(r#"track_stats("response_time", 200)"#)
.unwrap();
engine
.eval::<()>(r#"track_stats("response_time", 150)"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(state.contains_key("response_time_min"));
assert!(state.contains_key("response_time_max"));
assert!(state.contains_key("response_time_avg"));
assert!(state.contains_key("response_time_count"));
assert!(state.contains_key("response_time_sum"));
assert!(state.contains_key("response_time_p50"));
assert!(state.contains_key("response_time_p95"));
assert!(state.contains_key("response_time_p99"));
assert_eq!(
state
.get("response_time_min")
.unwrap()
.as_float()
.unwrap_or(0.0),
100.0
);
assert_eq!(
state
.get("response_time_max")
.unwrap()
.as_float()
.unwrap_or(0.0),
200.0
);
assert_eq!(
state
.get("response_time_count")
.unwrap()
.as_int()
.unwrap_or(0),
3
);
assert_eq!(
state
.get("response_time_sum")
.unwrap()
.as_float()
.unwrap_or(0.0),
450.0
);
let avg_map = state
.get("response_time_avg")
.unwrap()
.clone()
.try_cast::<rhai::Map>()
.unwrap();
assert_eq!(avg_map.get("sum").unwrap().as_float().unwrap_or(0.0), 450.0);
assert_eq!(avg_map.get("count").unwrap().as_int().unwrap_or(0), 3);
clear_tracking_state();
}
#[test]
fn test_track_stats_custom_percentiles() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_stats("latency", 100, [0.50, 0.90, 0.99, 0.999])"#)
.unwrap();
engine
.eval::<()>(r#"track_stats("latency", 200, [0.50, 0.90, 0.99, 0.999])"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(state.contains_key("latency_p50"));
assert!(state.contains_key("latency_p90"));
assert!(state.contains_key("latency_p99"));
assert!(state.contains_key("latency_p99.9"));
assert!(state.contains_key("latency_min"));
assert!(state.contains_key("latency_max"));
assert!(state.contains_key("latency_avg"));
clear_tracking_state();
}
#[test]
fn test_track_stats_unit_value() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine.eval::<()>(r#"track_stats("test", ())"#).unwrap();
let state = get_thread_tracking_state();
assert!(!state.contains_key("test_min"));
assert!(!state.contains_key("test_max"));
assert!(!state.contains_key("test_avg"));
assert!(!state.contains_key("test_count"));
assert!(!state.contains_key("test_sum"));
assert!(!state.contains_key("test_p50"));
clear_tracking_state();
}
#[test]
fn test_track_stats_multiple_types() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine.eval::<()>(r#"track_stats("mixed", 100)"#).unwrap(); engine.eval::<()>(r#"track_stats("mixed", 150.5)"#).unwrap();
let state = get_thread_tracking_state();
assert_eq!(state.get("mixed_count").unwrap().as_int().unwrap_or(0), 2);
let sum = state.get("mixed_sum").unwrap().as_float().unwrap_or(0.0);
assert!((sum - 250.5).abs() < 0.001);
clear_tracking_state();
}
#[test]
fn test_track_cardinality_basic() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_cardinality("users", "alice")"#)
.unwrap();
engine
.eval::<()>(r#"track_cardinality("users", "bob")"#)
.unwrap();
engine
.eval::<()>(r#"track_cardinality("users", "charlie")"#)
.unwrap();
engine
.eval::<()>(r#"track_cardinality("users", "alice")"#) .unwrap();
let state = get_thread_tracking_state();
assert!(state.contains_key("users"));
let value = state.get("users").unwrap();
assert!(value.is_blob());
let blob = value.clone().into_blob().unwrap();
assert!(is_hll_blob(&blob));
let hll = deserialize_hll(&blob).unwrap();
let cardinality = hll.len();
assert!((2.0..=4.0).contains(&cardinality));
clear_tracking_state();
}
#[test]
fn test_track_cardinality_integers() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
for i in 1..=100 {
engine
.eval::<()>(&format!(r#"track_cardinality("numbers", {})"#, i))
.unwrap();
}
let state = get_thread_tracking_state();
let blob = state.get("numbers").unwrap().clone().into_blob().unwrap();
let hll = deserialize_hll(&blob).unwrap();
let cardinality = hll.len();
assert!((90.0..=110.0).contains(&cardinality));
clear_tracking_state();
}
#[test]
fn test_track_cardinality_unit_value() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_cardinality("test", ())"#)
.unwrap();
let state = get_thread_tracking_state();
assert!(!state.contains_key("test"));
clear_tracking_state();
}
#[test]
fn test_track_cardinality_with_custom_error_rate() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
for i in 1..=100 {
engine
.eval::<()>(&format!(r#"track_cardinality("precise", {}, 0.005)"#, i))
.unwrap();
}
let state = get_thread_tracking_state();
let blob = state.get("precise").unwrap().clone().into_blob().unwrap();
let hll = deserialize_hll(&blob).unwrap();
let cardinality = hll.len();
assert!((95.0..=105.0).contains(&cardinality));
clear_tracking_state();
}
#[test]
fn test_track_cardinality_operation_metadata() {
clear_tracking_state();
let mut engine = rhai::Engine::new();
register_functions(&mut engine);
engine
.eval::<()>(r#"track_cardinality("ips", "192.168.1.1")"#)
.unwrap();
let internal = get_thread_internal_state();
assert!(internal.contains_key("__op_ips"));
assert_eq!(
internal
.get("__op_ips")
.unwrap()
.clone()
.into_string()
.unwrap(),
"cardinality"
);
clear_tracking_state();
}
#[test]
fn test_hll_serialization_roundtrip() {
let mut hll = new_hll();
hll.insert(&"test1".to_string());
hll.insert(&"test2".to_string());
hll.insert(&"test3".to_string());
let bytes = serialize_hll(&hll);
assert!(is_hll_blob(&bytes));
let restored = deserialize_hll(&bytes).unwrap();
assert!((hll.len() - restored.len()).abs() < 0.001);
}
}