use rhai::{Dynamic, Engine};
use std::cell::RefCell;
use std::collections::HashMap;
thread_local! {
pub static THREAD_TRACKING_STATE: RefCell<HashMap<String, Dynamic>> = RefCell::new(HashMap::new());
}
pub fn track_error(
error_type: &str,
line_num: Option<usize>,
message: &str,
verbose: bool,
quiet: bool,
config: Option<&crate::pipeline::PipelineConfig>,
) {
if error_type == "parse" {
crate::stats::stats_add_line_error();
} else {
crate::stats::stats_add_error();
}
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let count_key = format!("__kelora_error_count_{}", error_type);
let current_count = state
.get(&count_key)
.cloned()
.unwrap_or(Dynamic::from(0i64));
let new_count = current_count.as_int().unwrap_or(0) + 1;
state.insert(count_key.clone(), Dynamic::from(new_count));
state.insert(format!("__op_{}", count_key), Dynamic::from("count"));
if verbose && !quiet {
let formatted_error = crate::config::format_verbose_error_with_pipeline_config(
line_num,
&format!("{} error", error_type),
message,
config,
);
if crate::rhai_functions::strings::is_parallel_mode() {
crate::rhai_functions::strings::capture_stderr(formatted_error);
} else {
crate::rhai_functions::strings::capture_stderr(formatted_error.clone());
eprintln!("{}", formatted_error);
}
}
let samples_key = format!("__kelora_error_samples_{}", error_type);
let current_samples = state
.get(&samples_key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Array::new()));
if let Ok(mut arr) = current_samples.into_array() {
if arr.len() < 3 {
let formatted_error = crate::config::format_verbose_error_with_pipeline_config(
line_num,
&format!("{} error", error_type),
message,
config,
);
arr.push(Dynamic::from(formatted_error));
}
state.insert(samples_key.clone(), Dynamic::from(arr));
state.insert(format!("__op_{}", samples_key), Dynamic::from("unique"));
}
});
}
#[allow(dead_code)] pub fn has_errors_in_tracking(tracked: &HashMap<String, Dynamic>) -> bool {
for (key, value) in tracked {
if let Some(_error_type) = key.strip_prefix("__kelora_error_count_") {
if let Ok(count) = value.as_int() {
if count > 0 {
return true;
}
}
}
}
false
}
#[allow(dead_code)] pub fn extract_error_summary_from_tracking(
tracked: &HashMap<String, Dynamic>,
verbose: bool,
) -> Option<String> {
let mut total_errors = 0;
let mut error_types = Vec::new();
let mut samples = Vec::new();
for (key, value) in tracked {
if let Some(error_type) = key.strip_prefix("__kelora_error_count_") {
if let Ok(count) = value.as_int() {
if count > 0 {
total_errors += count;
error_types.push((error_type.to_string(), count));
}
}
}
}
if total_errors == 0 {
return None;
}
if verbose {
for (key, value) in tracked {
if let Some(_error_type) = key.strip_prefix("__kelora_error_samples_") {
if let Ok(sample_array) = value.clone().into_array() {
for sample in sample_array {
if let Ok(sample_msg) = sample.into_string() {
samples.push(sample_msg);
}
}
}
}
}
}
let mut summary = String::new();
if error_types.len() == 1 && error_types[0].0 == "parse" {
let count = error_types[0].1;
if count == 1 {
summary.push_str("Processing completed with 1 parse error");
} else {
summary.push_str(&format!("Processing completed with {} parse errors", count));
}
} else if error_types.len() == 1 {
let (error_type, count) = &error_types[0];
if *count == 1 {
summary.push_str(&format!("Processing completed with 1 {} error", error_type));
} else {
summary.push_str(&format!(
"Processing completed with {} {} errors",
count, error_type
));
}
} else {
summary.push_str(&format!(
"Processing completed with {} errors: ",
total_errors
));
let type_summaries: Vec<String> = error_types
.iter()
.map(|(error_type, count)| format!("{} {}", count, error_type))
.collect();
summary.push_str(&type_summaries.join(", "));
}
if verbose && !samples.is_empty() {
summary.push_str("\n\nError examples:");
for sample in samples {
summary.push_str(&format!("\n {}", sample));
}
}
Some(summary)
}
pub fn register_functions(engine: &mut Engine) {
engine.register_fn("track_count", |key: &str| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let count = state.get(key).cloned().unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
state.insert(key.to_string(), Dynamic::from(new_count));
state.insert(format!("__op_{}", key), Dynamic::from("count"));
});
});
engine.register_fn("track_count", |key: &str, delta: i64| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let count = state.get(key).cloned().unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + delta;
state.insert(key.to_string(), Dynamic::from(new_count));
state.insert(format!("__op_{}", key), Dynamic::from("count"));
});
});
engine.register_fn("track_count", |key: &str, delta: i32| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let count = state.get(key).cloned().unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + (delta as i64);
state.insert(key.to_string(), Dynamic::from(new_count));
state.insert(format!("__op_{}", key), Dynamic::from("count"));
});
});
engine.register_fn("track_count", |key: &str, delta: f64| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let count = state.get(key).cloned().unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + (delta as i64);
state.insert(key.to_string(), Dynamic::from(new_count));
state.insert(format!("__op_{}", key), Dynamic::from("count"));
});
});
engine.register_fn("track_count", |key: &str, delta: f32| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let count = state.get(key).cloned().unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + (delta as i64);
state.insert(key.to_string(), Dynamic::from(new_count));
state.insert(format!("__op_{}", key), Dynamic::from("count"));
});
});
engine.register_fn("track_min", |key: &str, value: i64| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MAX) as f64
} else {
current.as_float().unwrap_or(f64::INFINITY)
};
let value_f64 = value as f64;
if value_f64 < current_val {
state.insert(key.to_string(), Dynamic::from(value));
state.insert(format!("__op_{}", key), Dynamic::from("min"));
}
});
});
engine.register_fn("track_min", |key: &str, value: i32| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MAX) as f64
} else {
current.as_float().unwrap_or(f64::INFINITY)
};
let value_f64 = value as f64;
if value_f64 < current_val {
state.insert(key.to_string(), Dynamic::from(value));
state.insert(format!("__op_{}", key), Dynamic::from("min"));
}
});
});
engine.register_fn("track_min", |key: &str, value: f64| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MAX) as f64
} else {
current.as_float().unwrap_or(f64::INFINITY)
};
if value < current_val {
state.insert(key.to_string(), Dynamic::from(value));
state.insert(format!("__op_{}", key), Dynamic::from("min"));
}
});
});
engine.register_fn("track_min", |key: &str, value: f32| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MAX) as f64
} else {
current.as_float().unwrap_or(f64::INFINITY)
};
let value_f64 = value as f64;
if value_f64 < current_val {
state.insert(key.to_string(), Dynamic::from(value));
state.insert(format!("__op_{}", key), Dynamic::from("min"));
}
});
});
engine.register_fn("track_max", |key: &str, value: i64| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::NEG_INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MIN) as f64
} else {
current.as_float().unwrap_or(f64::NEG_INFINITY)
};
let value_f64 = value as f64;
if value_f64 > current_val {
state.insert(key.to_string(), Dynamic::from(value));
state.insert(format!("__op_{}", key), Dynamic::from("max"));
}
});
});
engine.register_fn("track_max", |key: &str, value: i32| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::NEG_INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MIN) as f64
} else {
current.as_float().unwrap_or(f64::NEG_INFINITY)
};
let value_f64 = value as f64;
if value_f64 > current_val {
state.insert(key.to_string(), Dynamic::from(value));
state.insert(format!("__op_{}", key), Dynamic::from("max"));
}
});
});
engine.register_fn("track_max", |key: &str, value: f64| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::NEG_INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MIN) as f64
} else {
current.as_float().unwrap_or(f64::NEG_INFINITY)
};
if value > current_val {
state.insert(key.to_string(), Dynamic::from(value));
state.insert(format!("__op_{}", key), Dynamic::from("max"));
}
});
});
engine.register_fn("track_max", |key: &str, value: f32| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or(Dynamic::from(f64::NEG_INFINITY));
let current_val = if current.is_int() {
current.as_int().unwrap_or(i64::MIN) as f64
} else {
current.as_float().unwrap_or(f64::NEG_INFINITY)
};
let value_f64 = value as f64;
if value_f64 > current_val {
state.insert(key.to_string(), Dynamic::from(value));
state.insert(format!("__op_{}", key), Dynamic::from("max"));
}
});
});
engine.register_fn("track_unique", |key: &str, value: &str| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state.get(key).cloned().unwrap_or_else(|| {
Dynamic::from(rhai::Array::new())
});
if let Ok(mut arr) = current.into_array() {
let value_dynamic = Dynamic::from(value.to_string());
if !arr
.iter()
.any(|v| v.clone().into_string().unwrap_or_default() == value)
{
arr.push(value_dynamic);
}
state.insert(key.to_string(), Dynamic::from(arr));
state.insert(format!("__op_{}", key), Dynamic::from("unique"));
}
});
});
engine.register_fn("track_unique", |key: &str, value: i64| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state.get(key).cloned().unwrap_or_else(|| {
Dynamic::from(rhai::Array::new())
});
if let Ok(mut arr) = current.into_array() {
let value_dynamic = Dynamic::from(value);
if !arr.iter().any(|v| v.as_int().unwrap_or(i64::MIN) == value) {
arr.push(value_dynamic);
}
state.insert(key.to_string(), Dynamic::from(arr));
state.insert(format!("__op_{}", key), Dynamic::from("unique"));
}
});
});
engine.register_fn("track_unique", |key: &str, value: i32| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state.get(key).cloned().unwrap_or_else(|| {
Dynamic::from(rhai::Array::new())
});
if let Ok(mut arr) = current.into_array() {
let value_dynamic = Dynamic::from(value);
if !arr
.iter()
.any(|v| v.as_int().unwrap_or(i64::MIN) == (value as i64))
{
arr.push(value_dynamic);
}
state.insert(key.to_string(), Dynamic::from(arr));
state.insert(format!("__op_{}", key), Dynamic::from("unique"));
}
});
});
engine.register_fn("track_unique", |key: &str, value: f64| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state.get(key).cloned().unwrap_or_else(|| {
Dynamic::from(rhai::Array::new())
});
if let Ok(mut arr) = current.into_array() {
let value_dynamic = Dynamic::from(value);
if !arr
.iter()
.any(|v| v.as_float().unwrap_or(f64::NAN) == value)
{
arr.push(value_dynamic);
}
state.insert(key.to_string(), Dynamic::from(arr));
state.insert(format!("__op_{}", key), Dynamic::from("unique"));
}
});
});
engine.register_fn("track_unique", |key: &str, value: f32| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state.get(key).cloned().unwrap_or_else(|| {
Dynamic::from(rhai::Array::new())
});
if let Ok(mut arr) = current.into_array() {
let value_dynamic = Dynamic::from(value);
if !arr
.iter()
.any(|v| v.as_float().unwrap_or(f64::NAN) == (value as f64))
{
arr.push(value_dynamic);
}
state.insert(key.to_string(), Dynamic::from(arr));
state.insert(format!("__op_{}", key), Dynamic::from("unique"));
}
});
});
engine.register_fn("track_bucket", |key: &str, bucket: &str| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Map::new()));
if let Some(mut map) = current.try_cast::<rhai::Map>() {
let count = map.get(bucket).cloned().unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
map.insert(bucket.into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
state.insert(format!("__op_{}", key), Dynamic::from("bucket"));
}
});
});
engine.register_fn("track_bucket", |key: &str, bucket: i64| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Map::new()));
if let Some(mut map) = current.try_cast::<rhai::Map>() {
let bucket_str = bucket.to_string();
let count = map
.get(bucket_str.as_str())
.cloned()
.unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
map.insert(bucket_str.into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
state.insert(format!("__op_{}", key), Dynamic::from("bucket"));
}
});
});
engine.register_fn("track_bucket", |key: &str, bucket: i32| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Map::new()));
if let Some(mut map) = current.try_cast::<rhai::Map>() {
let bucket_str = bucket.to_string();
let count = map
.get(bucket_str.as_str())
.cloned()
.unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
map.insert(bucket_str.into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
state.insert(format!("__op_{}", key), Dynamic::from("bucket"));
}
});
});
engine.register_fn("track_bucket", |key: &str, bucket: f64| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Map::new()));
if let Some(mut map) = current.try_cast::<rhai::Map>() {
let bucket_str = bucket.to_string();
let count = map
.get(bucket_str.as_str())
.cloned()
.unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
map.insert(bucket_str.into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
state.insert(format!("__op_{}", key), Dynamic::from("bucket"));
}
});
});
engine.register_fn("track_bucket", |key: &str, bucket: f32| {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
let current = state
.get(key)
.cloned()
.unwrap_or_else(|| Dynamic::from(rhai::Map::new()));
if let Some(mut map) = current.try_cast::<rhai::Map>() {
let bucket_str = bucket.to_string();
let count = map
.get(bucket_str.as_str())
.cloned()
.unwrap_or(Dynamic::from(0i64));
let new_count = count.as_int().unwrap_or(0) + 1;
map.insert(bucket_str.into(), Dynamic::from(new_count));
state.insert(key.to_string(), Dynamic::from(map));
state.insert(format!("__op_{}", key), Dynamic::from("bucket"));
}
});
});
}
pub fn set_thread_tracking_state(tracked: &HashMap<String, Dynamic>) {
THREAD_TRACKING_STATE.with(|state| {
let mut state = state.borrow_mut();
state.clear();
for (k, v) in tracked {
state.insert(k.clone(), v.clone());
}
});
}
pub fn get_thread_tracking_state() -> HashMap<String, Dynamic> {
THREAD_TRACKING_STATE.with(|state| state.borrow().clone())
}
#[allow(dead_code)] pub fn merge_thread_tracking_to_context(ctx: &mut crate::pipeline::PipelineContext) {
let thread_state = get_thread_tracking_state();
for (key, value) in thread_state {
ctx.tracker.insert(key, value);
}
}
#[allow(dead_code)] fn get_metrics_map() -> Dynamic {
Dynamic::from(rhai::Map::new()) }
#[allow(dead_code)] pub fn format_metrics_output(tracked: &HashMap<String, Dynamic>) -> String {
let mut output = String::new();
let mut user_values: Vec<_> = tracked
.iter()
.filter(|(k, _)| !k.starts_with("__op_") && !k.starts_with("__kelora_stats_"))
.collect();
if user_values.is_empty() {
return "No metrics tracked".to_string();
}
user_values.sort_by_key(|(k, _)| k.as_str());
for (key, value) in user_values {
if value.is::<rhai::Map>() {
if let Some(map) = value.clone().try_cast::<rhai::Map>() {
if let (Some(count), Some(sample)) = (map.get("count"), map.get("sample")) {
if let Ok(sample_array) = sample.clone().into_array() {
let sample_strings: Vec<String> = sample_array
.iter()
.take(3) .map(|v| format!("\"{}\"", v.clone().into_string().unwrap_or_default()))
.collect();
output.push_str(&format!(
"{:<12} = {{ count: {}, sample: [{}] }}\n",
key,
count.as_int().unwrap_or(0),
sample_strings.join(", ")
));
continue;
}
}
}
}
if value.is_int() {
output.push_str(&format!("{:<12} = {}\n", key, value.as_int().unwrap_or(0)));
} else if value.is_float() {
output.push_str(&format!(
"{:<12} = {}\n",
key,
value.as_float().unwrap_or(0.0)
));
} else {
output.push_str(&format!("{:<12} = {}\n", key, value));
}
}
output.trim_end().to_string()
}
#[allow(dead_code)] pub fn format_metrics_json(
tracked: &HashMap<String, Dynamic>,
) -> Result<String, serde_json::Error> {
let mut json_obj = serde_json::Map::new();
for (key, value) in tracked.iter() {
if key.starts_with("__op_")
|| key.starts_with("__kelora_stats_")
|| key.starts_with("__kelora_error_")
{
continue;
}
if value.is::<rhai::Map>() {
if let Some(map) = value.clone().try_cast::<rhai::Map>() {
if let (Some(count), Some(sample)) = (map.get("count"), map.get("sample")) {
let mut unique_obj = serde_json::Map::new();
unique_obj.insert(
"count".to_string(),
serde_json::Value::Number(serde_json::Number::from(
count.as_int().unwrap_or(0),
)),
);
if let Ok(sample_array) = sample.clone().into_array() {
let sample_values: Vec<serde_json::Value> = sample_array
.iter()
.map(|v| {
serde_json::Value::String(
v.clone().into_string().unwrap_or_default(),
)
})
.collect();
unique_obj.insert(
"sample".to_string(),
serde_json::Value::Array(sample_values),
);
}
json_obj.insert(key.clone(), serde_json::Value::Object(unique_obj));
continue;
}
}
}
if value.is_int() {
json_obj.insert(
key.clone(),
serde_json::Value::Number(serde_json::Number::from(value.as_int().unwrap_or(0))),
);
} else if value.is_float() {
if let Some(num) = serde_json::Number::from_f64(value.as_float().unwrap_or(0.0)) {
json_obj.insert(key.clone(), serde_json::Value::Number(num));
}
} else {
json_obj.insert(key.clone(), serde_json::Value::String(value.to_string()));
}
}
serde_json::to_string_pretty(&json_obj)
}
#[allow(dead_code)] pub fn extract_error_summary(tracked: &HashMap<String, Dynamic>) -> Option<String> {
let mut has_errors = false;
let mut summary = serde_json::Map::new();
let mut error_types: std::collections::HashSet<String> = std::collections::HashSet::new();
for key in tracked.keys() {
if let Some(suffix) = key.strip_prefix("__kelora_error_count_") {
error_types.insert(suffix.to_string());
}
}
for error_type in error_types {
let count_key = format!("__kelora_error_count_{}", error_type);
let examples_key = format!("__kelora_error_examples_{}", error_type);
if let Some(count_value) = tracked.get(&count_key) {
let count = count_value.as_int().unwrap_or(0);
if count > 0 {
has_errors = true;
let mut error_obj = serde_json::Map::new();
error_obj.insert(
"count".to_string(),
serde_json::Value::Number(serde_json::Number::from(count)),
);
if let Some(examples_value) = tracked.get(&examples_key) {
if let Ok(examples_array) = examples_value.clone().into_array() {
let examples: Vec<serde_json::Value> = examples_array
.iter()
.map(|v| {
serde_json::Value::String(
v.clone().into_string().unwrap_or_default(),
)
})
.collect();
error_obj
.insert("examples".to_string(), serde_json::Value::Array(examples));
}
}
summary.insert(error_type, serde_json::Value::Object(error_obj));
}
}
}
if has_errors {
Some(
serde_json::to_string_pretty(&summary)
.unwrap_or_else(|_| "Error serializing summary".to_string()),
)
} else {
None
}
}
#[allow(dead_code)] pub fn write_error_summary_to_file(
tracked: &HashMap<String, Dynamic>,
file_path: &str,
) -> Result<(), Box<dyn std::error::Error>> {
if let Some(summary) = extract_error_summary(tracked) {
use std::fs::File;
use std::io::Write;
let mut file = File::create(file_path)?;
file.write_all(summary.as_bytes())?;
}
Ok(())
}