1#[cfg(feature = "telemetry")]
84use opentelemetry::{
85 global::{BoxedSpan, BoxedTracer},
86 metrics::{Counter, Histogram, Meter, ObservableGauge},
87 trace::{Span, Status, Tracer},
88 KeyValue,
89};
90
91#[cfg(feature = "telemetry")]
92use std::sync::atomic::{AtomicU64, Ordering};
93#[cfg(feature = "telemetry")]
94use std::sync::Arc;
95
96#[cfg(feature = "telemetry")]
98fn get_memory_usage() -> Result<u64, std::io::Error> {
99 #[cfg(target_os = "linux")]
102 {
103 use std::fs;
104 let status = fs::read_to_string("/proc/self/status")?;
105 for line in status.lines() {
106 if line.starts_with("VmRSS:") {
107 let parts: Vec<&str> = line.split_whitespace().collect();
108 if parts.len() >= 2 {
109 if let Ok(kb) = parts[1].parse::<u64>() {
110 return Ok(kb * 1024); }
112 }
113 }
114 }
115 Err(std::io::Error::new(
116 std::io::ErrorKind::NotFound,
117 "VmRSS not found",
118 ))
119 }
120
121 #[cfg(not(target_os = "linux"))]
122 {
123 Ok(0)
125 }
126}
127
128#[cfg(feature = "telemetry")]
133pub struct ValidationMetrics {
134 validation_duration: Histogram<f64>,
136 check_duration: Histogram<f64>,
137 data_load_duration: Histogram<f64>,
138
139 rows_processed: Counter<u64>,
141 validation_runs: Counter<u64>,
142 validation_failures: Counter<u64>,
143 checks_passed: Counter<u64>,
144 checks_failed: Counter<u64>,
145
146 active_validations: Arc<AtomicU64>,
148 memory_usage_bytes: Option<ObservableGauge<u64>>,
149
150 custom_metrics: Histogram<f64>,
152}
153
154#[cfg(feature = "telemetry")]
155impl ValidationMetrics {
156 pub fn new(meter: &Meter) -> crate::prelude::Result<Self> {
158 let active_validations = Arc::new(AtomicU64::new(0));
159
160 let memory_usage = Some(
162 meter
163 .u64_observable_gauge("data.validation.memory")
164 .with_description("Memory usage of validation process in bytes")
165 .with_unit("By")
166 .with_callback(move |observer| {
167 if let Ok(usage) = get_memory_usage() {
169 observer.observe(usage, &[]);
170 }
171 })
172 .build(),
173 );
174
175 Ok(Self {
176 validation_duration: meter
178 .f64_histogram("data.validation.duration")
179 .with_description("Duration of complete validation suite execution")
180 .with_unit("s")
181 .build(),
182
183 check_duration: meter
184 .f64_histogram("data.validation.check.duration")
185 .with_description("Duration of individual validation checks")
186 .with_unit("s")
187 .build(),
188
189 data_load_duration: meter
190 .f64_histogram("data.processing.load.duration")
191 .with_description("Time to load data for validation")
192 .with_unit("s")
193 .build(),
194
195 rows_processed: meter
197 .u64_counter("data.validation.rows")
198 .with_description("Total number of rows processed during validation")
199 .with_unit("1")
200 .build(),
201
202 validation_runs: meter
203 .u64_counter("data.validation.total")
204 .with_description("Total number of validation runs")
205 .with_unit("1")
206 .build(),
207
208 validation_failures: meter
209 .u64_counter("data.validation.failures")
210 .with_description("Total number of failed validations")
211 .with_unit("1")
212 .build(),
213
214 checks_passed: meter
215 .u64_counter("data.validation.checks.passed")
216 .with_description("Total number of passed checks")
217 .with_unit("1")
218 .build(),
219
220 checks_failed: meter
221 .u64_counter("data.validation.checks.failed")
222 .with_description("Total number of failed checks")
223 .with_unit("1")
224 .build(),
225
226 active_validations,
228 memory_usage_bytes: memory_usage,
229
230 custom_metrics: meter
232 .f64_histogram("data.validation.custom_metric")
233 .with_description("Custom business metrics from validation constraints")
234 .with_unit("1")
235 .build(),
236 })
237 }
238
239 pub fn record_validation_duration(&self, duration_secs: f64, attributes: &[KeyValue]) {
241 self.validation_duration.record(duration_secs, attributes);
242 }
243
244 pub fn record_check_duration(&self, duration_secs: f64, attributes: &[KeyValue]) {
246 self.check_duration.record(duration_secs, attributes);
247 }
248
249 pub fn record_data_load_duration(&self, duration_secs: f64, attributes: &[KeyValue]) {
251 self.data_load_duration.record(duration_secs, attributes);
252 }
253
254 pub fn add_rows_processed(&self, count: u64, attributes: &[KeyValue]) {
256 self.rows_processed.add(count, attributes);
257 }
258
259 pub fn increment_validation_runs(&self, attributes: &[KeyValue]) {
261 self.validation_runs.add(1, attributes);
262 }
263
264 pub fn increment_validation_failures(&self, attributes: &[KeyValue]) {
266 self.validation_failures.add(1, attributes);
267 }
268
269 pub fn increment_checks_passed(&self, attributes: &[KeyValue]) {
271 self.checks_passed.add(1, attributes);
272 }
273
274 pub fn increment_checks_failed(&self, attributes: &[KeyValue]) {
276 self.checks_failed.add(1, attributes);
277 }
278
279 pub fn start_validation(&self) -> ActiveValidationGuard {
281 self.active_validations.fetch_add(1, Ordering::Relaxed);
282 ActiveValidationGuard {
283 counter: Arc::clone(&self.active_validations),
284 }
285 }
286
287 pub fn record_custom_metric(&self, value: f64, attributes: &[KeyValue]) {
289 self.custom_metrics.record(value, attributes);
290 }
291}
292
293#[cfg(feature = "telemetry")]
295impl std::fmt::Debug for ValidationMetrics {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 f.debug_struct("ValidationMetrics")
298 .field(
299 "active_validations",
300 &self.active_validations.load(Ordering::Relaxed),
301 )
302 .field("has_memory_gauge", &self.memory_usage_bytes.is_some())
303 .finish()
304 }
305}
306
307#[cfg(feature = "telemetry")]
309pub struct ActiveValidationGuard {
310 counter: Arc<AtomicU64>,
311}
312
313#[cfg(feature = "telemetry")]
314impl Drop for ActiveValidationGuard {
315 fn drop(&mut self) {
316 self.counter.fetch_sub(1, Ordering::Relaxed);
317 }
318}
319
320#[derive(Debug)]
326pub struct TermTelemetry {
327 #[cfg(feature = "telemetry")]
328 tracer: BoxedTracer,
329
330 #[cfg(feature = "telemetry")]
331 metrics: Option<Arc<ValidationMetrics>>,
332
333 pub detailed_metrics: bool,
335
336 pub record_timing: bool,
338
339 pub custom_attributes: std::collections::HashMap<String, String>,
341}
342
343impl TermTelemetry {
344 #[cfg(feature = "telemetry")]
362 pub fn new(tracer: BoxedTracer) -> Self {
363 Self {
364 tracer,
365 metrics: None,
366 detailed_metrics: true,
367 record_timing: true,
368 custom_attributes: std::collections::HashMap::new(),
369 }
370 }
371
372 pub fn disabled() -> Self {
377 Self {
378 #[cfg(feature = "telemetry")]
379 tracer: opentelemetry::global::tracer("noop"),
380 #[cfg(feature = "telemetry")]
381 metrics: None,
382 detailed_metrics: false,
383 record_timing: false,
384 custom_attributes: std::collections::HashMap::new(),
385 }
386 }
387
388 #[cfg(feature = "telemetry")]
404 pub fn with_meter(mut self, meter: &Meter) -> crate::prelude::Result<Self> {
405 self.metrics = Some(Arc::new(ValidationMetrics::new(meter)?));
406 Ok(self)
407 }
408
409 #[cfg(feature = "telemetry")]
411 pub fn metrics(&self) -> Option<&Arc<ValidationMetrics>> {
412 self.metrics.as_ref()
413 }
414
415 pub fn with_detailed_metrics(mut self, enabled: bool) -> Self {
417 self.detailed_metrics = enabled;
418 self
419 }
420
421 pub fn with_timing(mut self, enabled: bool) -> Self {
423 self.record_timing = enabled;
424 self
425 }
426
427 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
429 self.custom_attributes.insert(key.into(), value.into());
430 self
431 }
432
433 pub fn with_attributes<I, K, V>(mut self, attributes: I) -> Self
435 where
436 I: IntoIterator<Item = (K, V)>,
437 K: Into<String>,
438 V: Into<String>,
439 {
440 for (key, value) in attributes {
441 self.custom_attributes.insert(key.into(), value.into());
442 }
443 self
444 }
445
446 #[cfg(feature = "telemetry")]
453 pub fn start_suite_span(&self, suite_name: &str, check_count: usize) -> TermSpan {
454 let mut span = self.tracer.start(format!("validation_suite.{suite_name}"));
455
456 span.set_attribute(opentelemetry::KeyValue::new(
458 "validation.suite.name",
459 suite_name.to_string(),
460 ));
461 span.set_attribute(opentelemetry::KeyValue::new(
462 "validation.suite.check_count",
463 check_count as i64,
464 ));
465 span.set_attribute(opentelemetry::KeyValue::new("validation.type", "suite"));
466
467 for (key, value) in &self.custom_attributes {
469 span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
470 }
471
472 TermSpan::new(span)
473 }
474
475 #[cfg(not(feature = "telemetry"))]
477 pub fn start_suite_span(&self, _suite_name: &str, _check_count: usize) -> TermSpan {
478 TermSpan::noop()
479 }
480
481 #[cfg(feature = "telemetry")]
483 pub fn start_check_span(&self, check_name: &str, constraint_count: usize) -> TermSpan {
484 let mut span = self.tracer.start(format!("validation_check.{check_name}"));
485
486 span.set_attribute(opentelemetry::KeyValue::new(
487 "validation.check.name",
488 check_name.to_string(),
489 ));
490 span.set_attribute(opentelemetry::KeyValue::new(
491 "validation.check.constraint_count",
492 constraint_count as i64,
493 ));
494 span.set_attribute(opentelemetry::KeyValue::new("validation.type", "check"));
495
496 for (key, value) in &self.custom_attributes {
497 span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
498 }
499
500 TermSpan::new(span)
501 }
502
503 #[cfg(not(feature = "telemetry"))]
504 pub fn start_check_span(&self, _check_name: &str, _constraint_count: usize) -> TermSpan {
505 TermSpan::noop()
506 }
507
508 #[cfg(feature = "telemetry")]
510 pub fn start_constraint_span(&self, constraint_name: &str, column: Option<&str>) -> TermSpan {
511 let mut span = self
512 .tracer
513 .start(format!("validation_constraint.{constraint_name}"));
514
515 span.set_attribute(opentelemetry::KeyValue::new(
516 "validation.constraint.name",
517 constraint_name.to_string(),
518 ));
519 span.set_attribute(opentelemetry::KeyValue::new(
520 "validation.type",
521 "constraint",
522 ));
523
524 if let Some(col) = column {
525 span.set_attribute(opentelemetry::KeyValue::new(
526 "validation.constraint.column",
527 col.to_string(),
528 ));
529 }
530
531 for (key, value) in &self.custom_attributes {
532 span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
533 }
534
535 TermSpan::new(span)
536 }
537
538 #[cfg(not(feature = "telemetry"))]
539 pub fn start_constraint_span(&self, _constraint_name: &str, _column: Option<&str>) -> TermSpan {
540 TermSpan::noop()
541 }
542
543 #[cfg(feature = "telemetry")]
545 pub fn start_datasource_span(&self, source_type: &str, table_name: &str) -> TermSpan {
546 let mut span = self.tracer.start(format!("data_source.{source_type}"));
547
548 span.set_attribute(opentelemetry::KeyValue::new(
549 "data_source.type",
550 source_type.to_string(),
551 ));
552 span.set_attribute(opentelemetry::KeyValue::new(
553 "data_source.table_name",
554 table_name.to_string(),
555 ));
556 span.set_attribute(opentelemetry::KeyValue::new(
557 "validation.type",
558 "data_source",
559 ));
560
561 for (key, value) in &self.custom_attributes {
562 span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
563 }
564
565 TermSpan::new(span)
566 }
567
568 #[cfg(not(feature = "telemetry"))]
569 pub fn start_datasource_span(&self, _source_type: &str, _table_name: &str) -> TermSpan {
570 TermSpan::noop()
571 }
572}
573
574impl Clone for TermTelemetry {
576 fn clone(&self) -> Self {
577 Self {
578 #[cfg(feature = "telemetry")]
579 tracer: opentelemetry::global::tracer("noop"), #[cfg(feature = "telemetry")]
581 metrics: self.metrics.clone(), detailed_metrics: self.detailed_metrics,
583 record_timing: self.record_timing,
584 custom_attributes: self.custom_attributes.clone(),
585 }
586 }
587}
588
589pub struct TermSpan {
591 #[cfg(feature = "telemetry")]
592 span: BoxedSpan,
593
594 #[cfg(not(feature = "telemetry"))]
595 _phantom: std::marker::PhantomData<()>,
596}
597
598impl TermSpan {
599 #[cfg(feature = "telemetry")]
600 fn new(span: BoxedSpan) -> Self {
601 Self { span }
602 }
603
604 pub fn noop() -> Self {
607 Self {
608 #[cfg(feature = "telemetry")]
609 span: opentelemetry::global::tracer("noop").start("noop"),
610 #[cfg(not(feature = "telemetry"))]
611 _phantom: std::marker::PhantomData,
612 }
613 }
614
615 #[cfg(feature = "telemetry")]
617 pub fn add_event(&mut self, name: impl Into<String>, attributes: Vec<opentelemetry::KeyValue>) {
618 self.span.add_event(name.into(), attributes);
619 }
620
621 #[cfg(not(feature = "telemetry"))]
622 pub fn add_event(&mut self, _name: impl Into<String>, _attributes: Vec<()>) {
623 }
625
626 #[cfg(feature = "telemetry")]
628 pub fn set_attribute(&mut self, kv: opentelemetry::KeyValue) {
629 self.span.set_attribute(kv);
630 }
631
632 #[cfg(not(feature = "telemetry"))]
633 pub fn set_attribute(&mut self, _kv: ()) {
634 }
636
637 #[cfg(feature = "telemetry")]
639 pub fn set_status(&mut self, status: Status) {
640 self.span.set_status(status);
641 }
642
643 #[cfg(not(feature = "telemetry"))]
644 pub fn set_status(&mut self, _status: ()) {
645 }
647
648 #[cfg(feature = "telemetry")]
650 pub fn record_error(&mut self, error: &dyn std::error::Error) {
651 self.span.record_error(error);
652 self.span.set_status(Status::Error {
653 description: error.to_string().into(),
654 });
655 }
656
657 #[cfg(not(feature = "telemetry"))]
658 pub fn record_error(&mut self, _error: &dyn std::error::Error) {
659 }
661}
662
663impl Drop for TermSpan {
664 #[cfg(feature = "telemetry")]
665 fn drop(&mut self) {
666 self.span.end();
667 }
668
669 #[cfg(not(feature = "telemetry"))]
670 fn drop(&mut self) {
671 }
673}
674
675pub mod utils {
677 use super::*;
678
679 #[cfg(feature = "telemetry")]
681 pub fn record_validation_metrics(
682 span: &mut TermSpan,
683 passed: u32,
684 failed: u32,
685 skipped: u32,
686 duration_ms: u64,
687 ) {
688 span.set_attribute(opentelemetry::KeyValue::new(
689 "validation.metrics.passed",
690 passed as i64,
691 ));
692 span.set_attribute(opentelemetry::KeyValue::new(
693 "validation.metrics.failed",
694 failed as i64,
695 ));
696 span.set_attribute(opentelemetry::KeyValue::new(
697 "validation.metrics.skipped",
698 skipped as i64,
699 ));
700 span.set_attribute(opentelemetry::KeyValue::new(
701 "validation.metrics.total",
702 (passed + failed + skipped) as i64,
703 ));
704 span.set_attribute(opentelemetry::KeyValue::new(
705 "validation.duration_ms",
706 duration_ms as i64,
707 ));
708 }
709
710 #[cfg(not(feature = "telemetry"))]
711 pub fn record_validation_metrics(
712 _span: &mut TermSpan,
713 _passed: u32,
714 _failed: u32,
715 _skipped: u32,
716 _duration_ms: u64,
717 ) {
718 }
720
721 #[cfg(feature = "telemetry")]
723 pub fn record_constraint_result(span: &mut TermSpan, result: &crate::core::ConstraintResult) {
724 use crate::core::ConstraintStatus;
725
726 let status_str = match result.status {
727 ConstraintStatus::Success => "success",
728 ConstraintStatus::Failure => "failure",
729 ConstraintStatus::Skipped => "skipped",
730 };
731
732 span.set_attribute(opentelemetry::KeyValue::new(
733 "validation.constraint.status",
734 status_str,
735 ));
736
737 if let Some(metric) = result.metric {
738 span.set_attribute(opentelemetry::KeyValue::new(
739 "validation.constraint.metric",
740 metric,
741 ));
742 }
743
744 if let Some(ref message) = result.message {
745 span.set_attribute(opentelemetry::KeyValue::new(
746 "validation.constraint.message",
747 message.clone(),
748 ));
749 }
750 }
751
752 #[cfg(not(feature = "telemetry"))]
753 pub fn record_constraint_result(_span: &mut TermSpan, _result: &crate::core::ConstraintResult) {
754 }
756}
757
758#[cfg(test)]
759mod tests {
760 use super::*;
761
762 #[test]
763 fn test_disabled_telemetry() {
764 let telemetry = TermTelemetry::disabled();
765 assert!(!telemetry.detailed_metrics);
766 assert!(!telemetry.record_timing);
767 }
768
769 #[test]
770 fn test_telemetry_configuration() {
771 let telemetry = TermTelemetry::disabled()
772 .with_detailed_metrics(true)
773 .with_timing(true)
774 .with_attribute("service.name", "test_service")
775 .with_attributes([("env", "test"), ("version", "1.0.0")]);
776
777 assert!(telemetry.detailed_metrics);
778 assert!(telemetry.record_timing);
779 assert_eq!(
780 telemetry.custom_attributes.get("service.name"),
781 Some(&"test_service".to_string())
782 );
783 assert_eq!(
784 telemetry.custom_attributes.get("env"),
785 Some(&"test".to_string())
786 );
787 assert_eq!(
788 telemetry.custom_attributes.get("version"),
789 Some(&"1.0.0".to_string())
790 );
791 }
792
793 #[test]
794 fn test_noop_span_operations() {
795 let telemetry = TermTelemetry::disabled();
796 let mut span = telemetry.start_suite_span("test_suite", 5);
797
798 span.add_event("test_event", vec![]);
800 #[cfg(feature = "telemetry")]
801 span.set_attribute(opentelemetry::KeyValue::new("test_key", "test_value"));
802 span.record_error(&std::io::Error::new(
803 std::io::ErrorKind::Other,
804 "test error",
805 ));
806 }
807
808 #[cfg(feature = "telemetry")]
809 #[test]
810 fn test_telemetry_with_noop_tracer() {
811 let tracer = opentelemetry::global::tracer("test");
812 let telemetry = TermTelemetry::new(tracer);
813
814 let _suite_span = telemetry.start_suite_span("test_suite", 3);
816 let _check_span = telemetry.start_check_span("test_check", 2);
817 let _constraint_span =
818 telemetry.start_constraint_span("test_constraint", Some("test_column"));
819 let _datasource_span = telemetry.start_datasource_span("csv", "test_table");
820 }
821}