#[cfg(feature = "telemetry")]
use opentelemetry::{
global::{BoxedSpan, BoxedTracer},
metrics::{Counter, Histogram, Meter, ObservableGauge},
trace::{Span, Status, Tracer},
KeyValue,
};
#[cfg(feature = "telemetry")]
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(feature = "telemetry")]
use std::sync::Arc;
#[cfg(feature = "telemetry")]
fn get_memory_usage() -> Result<u64, std::io::Error> {
#[cfg(target_os = "linux")]
{
use std::fs;
let status = fs::read_to_string("/proc/self/status")?;
for line in status.lines() {
if line.starts_with("VmRSS:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(kb) = parts[1].parse::<u64>() {
return Ok(kb * 1024); }
}
}
}
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"VmRSS not found",
))
}
#[cfg(not(target_os = "linux"))]
{
Ok(0)
}
}
#[cfg(feature = "telemetry")]
pub struct ValidationMetrics {
validation_duration: Histogram<f64>,
check_duration: Histogram<f64>,
data_load_duration: Histogram<f64>,
rows_processed: Counter<u64>,
validation_runs: Counter<u64>,
validation_failures: Counter<u64>,
checks_passed: Counter<u64>,
checks_failed: Counter<u64>,
active_validations: Arc<AtomicU64>,
memory_usage_bytes: Option<ObservableGauge<u64>>,
custom_metrics: Histogram<f64>,
}
#[cfg(feature = "telemetry")]
impl ValidationMetrics {
pub fn new(meter: &Meter) -> crate::prelude::Result<Self> {
let active_validations = Arc::new(AtomicU64::new(0));
let memory_usage = Some(
meter
.u64_observable_gauge("data.validation.memory")
.with_description("Memory usage of validation process in bytes")
.with_unit("By")
.with_callback(move |observer| {
if let Ok(usage) = get_memory_usage() {
observer.observe(usage, &[]);
}
})
.build(),
);
Ok(Self {
validation_duration: meter
.f64_histogram("data.validation.duration")
.with_description("Duration of complete validation suite execution")
.with_unit("s")
.build(),
check_duration: meter
.f64_histogram("data.validation.check.duration")
.with_description("Duration of individual validation checks")
.with_unit("s")
.build(),
data_load_duration: meter
.f64_histogram("data.processing.load.duration")
.with_description("Time to load data for validation")
.with_unit("s")
.build(),
rows_processed: meter
.u64_counter("data.validation.rows")
.with_description("Total number of rows processed during validation")
.with_unit("1")
.build(),
validation_runs: meter
.u64_counter("data.validation.total")
.with_description("Total number of validation runs")
.with_unit("1")
.build(),
validation_failures: meter
.u64_counter("data.validation.failures")
.with_description("Total number of failed validations")
.with_unit("1")
.build(),
checks_passed: meter
.u64_counter("data.validation.checks.passed")
.with_description("Total number of passed checks")
.with_unit("1")
.build(),
checks_failed: meter
.u64_counter("data.validation.checks.failed")
.with_description("Total number of failed checks")
.with_unit("1")
.build(),
active_validations,
memory_usage_bytes: memory_usage,
custom_metrics: meter
.f64_histogram("data.validation.custom_metric")
.with_description("Custom business metrics from validation constraints")
.with_unit("1")
.build(),
})
}
pub fn record_validation_duration(&self, duration_secs: f64, attributes: &[KeyValue]) {
self.validation_duration.record(duration_secs, attributes);
}
pub fn record_check_duration(&self, duration_secs: f64, attributes: &[KeyValue]) {
self.check_duration.record(duration_secs, attributes);
}
pub fn record_data_load_duration(&self, duration_secs: f64, attributes: &[KeyValue]) {
self.data_load_duration.record(duration_secs, attributes);
}
pub fn add_rows_processed(&self, count: u64, attributes: &[KeyValue]) {
self.rows_processed.add(count, attributes);
}
pub fn increment_validation_runs(&self, attributes: &[KeyValue]) {
self.validation_runs.add(1, attributes);
}
pub fn increment_validation_failures(&self, attributes: &[KeyValue]) {
self.validation_failures.add(1, attributes);
}
pub fn increment_checks_passed(&self, attributes: &[KeyValue]) {
self.checks_passed.add(1, attributes);
}
pub fn increment_checks_failed(&self, attributes: &[KeyValue]) {
self.checks_failed.add(1, attributes);
}
pub fn start_validation(&self) -> ActiveValidationGuard {
self.active_validations.fetch_add(1, Ordering::Relaxed);
ActiveValidationGuard {
counter: Arc::clone(&self.active_validations),
}
}
pub fn record_custom_metric(&self, value: f64, attributes: &[KeyValue]) {
self.custom_metrics.record(value, attributes);
}
}
#[cfg(feature = "telemetry")]
impl std::fmt::Debug for ValidationMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ValidationMetrics")
.field(
"active_validations",
&self.active_validations.load(Ordering::Relaxed),
)
.field("has_memory_gauge", &self.memory_usage_bytes.is_some())
.finish()
}
}
#[cfg(feature = "telemetry")]
pub struct ActiveValidationGuard {
counter: Arc<AtomicU64>,
}
#[cfg(feature = "telemetry")]
impl Drop for ActiveValidationGuard {
fn drop(&mut self) {
self.counter.fetch_sub(1, Ordering::Relaxed);
}
}
#[derive(Debug)]
pub struct TermTelemetry {
#[cfg(feature = "telemetry")]
tracer: BoxedTracer,
#[cfg(feature = "telemetry")]
metrics: Option<Arc<ValidationMetrics>>,
pub detailed_metrics: bool,
pub record_timing: bool,
pub custom_attributes: std::collections::HashMap<String, String>,
}
impl TermTelemetry {
#[cfg(feature = "telemetry")]
pub fn new(tracer: BoxedTracer) -> Self {
Self {
tracer,
metrics: None,
detailed_metrics: true,
record_timing: true,
custom_attributes: std::collections::HashMap::new(),
}
}
pub fn disabled() -> Self {
Self {
#[cfg(feature = "telemetry")]
tracer: opentelemetry::global::tracer("noop"),
#[cfg(feature = "telemetry")]
metrics: None,
detailed_metrics: false,
record_timing: false,
custom_attributes: std::collections::HashMap::new(),
}
}
#[cfg(feature = "telemetry")]
pub fn with_meter(mut self, meter: &Meter) -> crate::prelude::Result<Self> {
self.metrics = Some(Arc::new(ValidationMetrics::new(meter)?));
Ok(self)
}
#[cfg(feature = "telemetry")]
pub fn metrics(&self) -> Option<&Arc<ValidationMetrics>> {
self.metrics.as_ref()
}
pub fn with_detailed_metrics(mut self, enabled: bool) -> Self {
self.detailed_metrics = enabled;
self
}
pub fn with_timing(mut self, enabled: bool) -> Self {
self.record_timing = enabled;
self
}
pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.custom_attributes.insert(key.into(), value.into());
self
}
pub fn with_attributes<I, K, V>(mut self, attributes: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: Into<String>,
V: Into<String>,
{
for (key, value) in attributes {
self.custom_attributes.insert(key.into(), value.into());
}
self
}
#[cfg(feature = "telemetry")]
pub fn start_suite_span(&self, suite_name: &str, check_count: usize) -> TermSpan {
let mut span = self.tracer.start(format!("validation_suite.{suite_name}"));
span.set_attribute(opentelemetry::KeyValue::new(
"validation.suite.name",
suite_name.to_string(),
));
span.set_attribute(opentelemetry::KeyValue::new(
"validation.suite.check_count",
check_count as i64,
));
span.set_attribute(opentelemetry::KeyValue::new("validation.type", "suite"));
for (key, value) in &self.custom_attributes {
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
}
TermSpan::new(span)
}
#[cfg(not(feature = "telemetry"))]
pub fn start_suite_span(&self, _suite_name: &str, _check_count: usize) -> TermSpan {
TermSpan::noop()
}
#[cfg(feature = "telemetry")]
pub fn start_check_span(&self, check_name: &str, constraint_count: usize) -> TermSpan {
let mut span = self.tracer.start(format!("validation_check.{check_name}"));
span.set_attribute(opentelemetry::KeyValue::new(
"validation.check.name",
check_name.to_string(),
));
span.set_attribute(opentelemetry::KeyValue::new(
"validation.check.constraint_count",
constraint_count as i64,
));
span.set_attribute(opentelemetry::KeyValue::new("validation.type", "check"));
for (key, value) in &self.custom_attributes {
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
}
TermSpan::new(span)
}
#[cfg(not(feature = "telemetry"))]
pub fn start_check_span(&self, _check_name: &str, _constraint_count: usize) -> TermSpan {
TermSpan::noop()
}
#[cfg(feature = "telemetry")]
pub fn start_constraint_span(&self, constraint_name: &str, column: Option<&str>) -> TermSpan {
let mut span = self
.tracer
.start(format!("validation_constraint.{constraint_name}"));
span.set_attribute(opentelemetry::KeyValue::new(
"validation.constraint.name",
constraint_name.to_string(),
));
span.set_attribute(opentelemetry::KeyValue::new(
"validation.type",
"constraint",
));
if let Some(col) = column {
span.set_attribute(opentelemetry::KeyValue::new(
"validation.constraint.column",
col.to_string(),
));
}
for (key, value) in &self.custom_attributes {
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
}
TermSpan::new(span)
}
#[cfg(not(feature = "telemetry"))]
pub fn start_constraint_span(&self, _constraint_name: &str, _column: Option<&str>) -> TermSpan {
TermSpan::noop()
}
#[cfg(feature = "telemetry")]
pub fn start_datasource_span(&self, source_type: &str, table_name: &str) -> TermSpan {
let mut span = self.tracer.start(format!("data_source.{source_type}"));
span.set_attribute(opentelemetry::KeyValue::new(
"data_source.type",
source_type.to_string(),
));
span.set_attribute(opentelemetry::KeyValue::new(
"data_source.table_name",
table_name.to_string(),
));
span.set_attribute(opentelemetry::KeyValue::new(
"validation.type",
"data_source",
));
for (key, value) in &self.custom_attributes {
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
}
TermSpan::new(span)
}
#[cfg(not(feature = "telemetry"))]
pub fn start_datasource_span(&self, _source_type: &str, _table_name: &str) -> TermSpan {
TermSpan::noop()
}
}
impl Clone for TermTelemetry {
fn clone(&self) -> Self {
Self {
#[cfg(feature = "telemetry")]
tracer: opentelemetry::global::tracer("noop"), #[cfg(feature = "telemetry")]
metrics: self.metrics.clone(), detailed_metrics: self.detailed_metrics,
record_timing: self.record_timing,
custom_attributes: self.custom_attributes.clone(),
}
}
}
pub struct TermSpan {
#[cfg(feature = "telemetry")]
span: BoxedSpan,
#[cfg(not(feature = "telemetry"))]
_phantom: std::marker::PhantomData<()>,
}
impl TermSpan {
#[cfg(feature = "telemetry")]
fn new(span: BoxedSpan) -> Self {
Self { span }
}
pub fn noop() -> Self {
Self {
#[cfg(feature = "telemetry")]
span: opentelemetry::global::tracer("noop").start("noop"),
#[cfg(not(feature = "telemetry"))]
_phantom: std::marker::PhantomData,
}
}
#[cfg(feature = "telemetry")]
pub fn add_event(&mut self, name: impl Into<String>, attributes: Vec<opentelemetry::KeyValue>) {
self.span.add_event(name.into(), attributes);
}
#[cfg(not(feature = "telemetry"))]
pub fn add_event(&mut self, _name: impl Into<String>, _attributes: Vec<()>) {
}
#[cfg(feature = "telemetry")]
pub fn set_attribute(&mut self, kv: opentelemetry::KeyValue) {
self.span.set_attribute(kv);
}
#[cfg(not(feature = "telemetry"))]
pub fn set_attribute(&mut self, _kv: ()) {
}
#[cfg(feature = "telemetry")]
pub fn set_status(&mut self, status: Status) {
self.span.set_status(status);
}
#[cfg(not(feature = "telemetry"))]
pub fn set_status(&mut self, _status: ()) {
}
#[cfg(feature = "telemetry")]
pub fn record_error(&mut self, error: &dyn std::error::Error) {
self.span.record_error(error);
self.span.set_status(Status::Error {
description: error.to_string().into(),
});
}
#[cfg(not(feature = "telemetry"))]
pub fn record_error(&mut self, _error: &dyn std::error::Error) {
}
}
impl Drop for TermSpan {
#[cfg(feature = "telemetry")]
fn drop(&mut self) {
self.span.end();
}
#[cfg(not(feature = "telemetry"))]
fn drop(&mut self) {
}
}
pub mod utils {
use super::*;
#[cfg(feature = "telemetry")]
pub fn record_validation_metrics(
span: &mut TermSpan,
passed: u32,
failed: u32,
skipped: u32,
duration_ms: u64,
) {
span.set_attribute(opentelemetry::KeyValue::new(
"validation.metrics.passed",
passed as i64,
));
span.set_attribute(opentelemetry::KeyValue::new(
"validation.metrics.failed",
failed as i64,
));
span.set_attribute(opentelemetry::KeyValue::new(
"validation.metrics.skipped",
skipped as i64,
));
span.set_attribute(opentelemetry::KeyValue::new(
"validation.metrics.total",
(passed + failed + skipped) as i64,
));
span.set_attribute(opentelemetry::KeyValue::new(
"validation.duration_ms",
duration_ms as i64,
));
}
#[cfg(not(feature = "telemetry"))]
pub fn record_validation_metrics(
_span: &mut TermSpan,
_passed: u32,
_failed: u32,
_skipped: u32,
_duration_ms: u64,
) {
}
#[cfg(feature = "telemetry")]
pub fn record_constraint_result(span: &mut TermSpan, result: &crate::core::ConstraintResult) {
use crate::core::ConstraintStatus;
let status_str = match result.status {
ConstraintStatus::Success => "success",
ConstraintStatus::Failure => "failure",
ConstraintStatus::Skipped => "skipped",
};
span.set_attribute(opentelemetry::KeyValue::new(
"validation.constraint.status",
status_str,
));
if let Some(metric) = result.metric {
span.set_attribute(opentelemetry::KeyValue::new(
"validation.constraint.metric",
metric,
));
}
if let Some(ref message) = result.message {
span.set_attribute(opentelemetry::KeyValue::new(
"validation.constraint.message",
message.clone(),
));
}
}
#[cfg(not(feature = "telemetry"))]
pub fn record_constraint_result(_span: &mut TermSpan, _result: &crate::core::ConstraintResult) {
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_disabled_telemetry() {
let telemetry = TermTelemetry::disabled();
assert!(!telemetry.detailed_metrics);
assert!(!telemetry.record_timing);
}
#[test]
fn test_telemetry_configuration() {
let telemetry = TermTelemetry::disabled()
.with_detailed_metrics(true)
.with_timing(true)
.with_attribute("service.name", "test_service")
.with_attributes([("env", "test"), ("version", "1.0.0")]);
assert!(telemetry.detailed_metrics);
assert!(telemetry.record_timing);
assert_eq!(
telemetry.custom_attributes.get("service.name"),
Some(&"test_service".to_string())
);
assert_eq!(
telemetry.custom_attributes.get("env"),
Some(&"test".to_string())
);
assert_eq!(
telemetry.custom_attributes.get("version"),
Some(&"1.0.0".to_string())
);
}
#[test]
fn test_noop_span_operations() {
let telemetry = TermTelemetry::disabled();
let mut span = telemetry.start_suite_span("test_suite", 5);
span.add_event("test_event", vec![]);
#[cfg(feature = "telemetry")]
span.set_attribute(opentelemetry::KeyValue::new("test_key", "test_value"));
span.record_error(&std::io::Error::new(
std::io::ErrorKind::Other,
"test error",
));
}
#[cfg(feature = "telemetry")]
#[test]
fn test_telemetry_with_noop_tracer() {
let tracer = opentelemetry::global::tracer("test");
let telemetry = TermTelemetry::new(tracer);
let _suite_span = telemetry.start_suite_span("test_suite", 3);
let _check_span = telemetry.start_check_span("test_check", 2);
let _constraint_span =
telemetry.start_constraint_span("test_constraint", Some("test_column"));
let _datasource_span = telemetry.start_datasource_span("csv", "test_table");
}
}