1#[cfg(feature = "telemetry")]
84use opentelemetry::{
85 global::{BoxedSpan, BoxedTracer},
86 metrics::{Counter, Histogram, Meter, ObservableGauge},
87 trace::{Span, Status, Tracer},
88 KeyValue,
89};
90
91#[cfg(feature = "telemetry")]
92use std::sync::atomic::{AtomicU64, Ordering};
93#[cfg(feature = "telemetry")]
94use std::sync::Arc;
95
96#[cfg(feature = "telemetry")]
98fn get_memory_usage() -> Result<u64, std::io::Error> {
99 #[cfg(target_os = "linux")]
102 {
103 use std::fs;
104 let status = fs::read_to_string("/proc/self/status")?;
105 for line in status.lines() {
106 if line.starts_with("VmRSS:") {
107 let parts: Vec<&str> = line.split_whitespace().collect();
108 if parts.len() >= 2 {
109 if let Ok(kb) = parts[1].parse::<u64>() {
110 return Ok(kb * 1024); }
112 }
113 }
114 }
115 Err(std::io::Error::new(
116 std::io::ErrorKind::NotFound,
117 "VmRSS not found",
118 ))
119 }
120
121 #[cfg(not(target_os = "linux"))]
122 {
123 Ok(0)
125 }
126}
127
128#[cfg(feature = "telemetry")]
133pub struct ValidationMetrics {
134 validation_duration: Histogram<f64>,
136 check_duration: Histogram<f64>,
137 data_load_duration: Histogram<f64>,
138
139 rows_processed: Counter<u64>,
141 validation_runs: Counter<u64>,
142 validation_failures: Counter<u64>,
143 checks_passed: Counter<u64>,
144 checks_failed: Counter<u64>,
145
146 active_validations: Arc<AtomicU64>,
148 memory_usage_bytes: Option<ObservableGauge<u64>>,
149
150 custom_metrics: Histogram<f64>,
152}
153
154#[cfg(feature = "telemetry")]
155impl ValidationMetrics {
156 pub fn new(meter: &Meter) -> crate::prelude::Result<Self> {
158 let active_validations = Arc::new(AtomicU64::new(0));
159
160 let memory_usage = meter
162 .u64_observable_gauge("data.validation.memory")
163 .with_description("Memory usage of validation process in bytes")
164 .with_unit("By")
165 .with_callback(move |observer| {
166 if let Ok(usage) = get_memory_usage() {
168 observer.observe(usage, &[]);
169 }
170 })
171 .try_init()
172 .ok();
173
174 Ok(Self {
175 validation_duration: meter
177 .f64_histogram("data.validation.duration")
178 .with_description("Duration of complete validation suite execution")
179 .with_unit("s")
180 .init(),
181
182 check_duration: meter
183 .f64_histogram("data.validation.check.duration")
184 .with_description("Duration of individual validation checks")
185 .with_unit("s")
186 .init(),
187
188 data_load_duration: meter
189 .f64_histogram("data.processing.load.duration")
190 .with_description("Time to load data for validation")
191 .with_unit("s")
192 .init(),
193
194 rows_processed: meter
196 .u64_counter("data.validation.rows")
197 .with_description("Total number of rows processed during validation")
198 .with_unit("1")
199 .init(),
200
201 validation_runs: meter
202 .u64_counter("data.validation.total")
203 .with_description("Total number of validation runs")
204 .with_unit("1")
205 .init(),
206
207 validation_failures: meter
208 .u64_counter("data.validation.failures")
209 .with_description("Total number of failed validations")
210 .with_unit("1")
211 .init(),
212
213 checks_passed: meter
214 .u64_counter("data.validation.checks.passed")
215 .with_description("Total number of passed checks")
216 .with_unit("1")
217 .init(),
218
219 checks_failed: meter
220 .u64_counter("data.validation.checks.failed")
221 .with_description("Total number of failed checks")
222 .with_unit("1")
223 .init(),
224
225 active_validations,
227 memory_usage_bytes: memory_usage,
228
229 custom_metrics: meter
231 .f64_histogram("data.validation.custom_metric")
232 .with_description("Custom business metrics from validation constraints")
233 .with_unit("1")
234 .init(),
235 })
236 }
237
238 pub fn record_validation_duration(&self, duration_secs: f64, attributes: &[KeyValue]) {
240 self.validation_duration.record(duration_secs, attributes);
241 }
242
243 pub fn record_check_duration(&self, duration_secs: f64, attributes: &[KeyValue]) {
245 self.check_duration.record(duration_secs, attributes);
246 }
247
248 pub fn record_data_load_duration(&self, duration_secs: f64, attributes: &[KeyValue]) {
250 self.data_load_duration.record(duration_secs, attributes);
251 }
252
253 pub fn add_rows_processed(&self, count: u64, attributes: &[KeyValue]) {
255 self.rows_processed.add(count, attributes);
256 }
257
258 pub fn increment_validation_runs(&self, attributes: &[KeyValue]) {
260 self.validation_runs.add(1, attributes);
261 }
262
263 pub fn increment_validation_failures(&self, attributes: &[KeyValue]) {
265 self.validation_failures.add(1, attributes);
266 }
267
268 pub fn increment_checks_passed(&self, attributes: &[KeyValue]) {
270 self.checks_passed.add(1, attributes);
271 }
272
273 pub fn increment_checks_failed(&self, attributes: &[KeyValue]) {
275 self.checks_failed.add(1, attributes);
276 }
277
278 pub fn start_validation(&self) -> ActiveValidationGuard {
280 self.active_validations.fetch_add(1, Ordering::Relaxed);
281 ActiveValidationGuard {
282 counter: Arc::clone(&self.active_validations),
283 }
284 }
285
286 pub fn record_custom_metric(&self, value: f64, attributes: &[KeyValue]) {
288 self.custom_metrics.record(value, attributes);
289 }
290}
291
292#[cfg(feature = "telemetry")]
294impl std::fmt::Debug for ValidationMetrics {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 f.debug_struct("ValidationMetrics")
297 .field(
298 "active_validations",
299 &self.active_validations.load(Ordering::Relaxed),
300 )
301 .field("has_memory_gauge", &self.memory_usage_bytes.is_some())
302 .finish()
303 }
304}
305
306#[cfg(feature = "telemetry")]
308pub struct ActiveValidationGuard {
309 counter: Arc<AtomicU64>,
310}
311
312#[cfg(feature = "telemetry")]
313impl Drop for ActiveValidationGuard {
314 fn drop(&mut self) {
315 self.counter.fetch_sub(1, Ordering::Relaxed);
316 }
317}
318
319#[derive(Debug)]
325pub struct TermTelemetry {
326 #[cfg(feature = "telemetry")]
327 tracer: BoxedTracer,
328
329 #[cfg(feature = "telemetry")]
330 metrics: Option<Arc<ValidationMetrics>>,
331
332 pub detailed_metrics: bool,
334
335 pub record_timing: bool,
337
338 pub custom_attributes: std::collections::HashMap<String, String>,
340}
341
342impl TermTelemetry {
343 #[cfg(feature = "telemetry")]
361 pub fn new(tracer: BoxedTracer) -> Self {
362 Self {
363 tracer,
364 metrics: None,
365 detailed_metrics: true,
366 record_timing: true,
367 custom_attributes: std::collections::HashMap::new(),
368 }
369 }
370
371 pub fn disabled() -> Self {
376 Self {
377 #[cfg(feature = "telemetry")]
378 tracer: opentelemetry::global::tracer("noop"),
379 #[cfg(feature = "telemetry")]
380 metrics: None,
381 detailed_metrics: false,
382 record_timing: false,
383 custom_attributes: std::collections::HashMap::new(),
384 }
385 }
386
387 #[cfg(feature = "telemetry")]
403 pub fn with_meter(mut self, meter: &Meter) -> crate::prelude::Result<Self> {
404 self.metrics = Some(Arc::new(ValidationMetrics::new(meter)?));
405 Ok(self)
406 }
407
408 #[cfg(feature = "telemetry")]
410 pub fn metrics(&self) -> Option<&Arc<ValidationMetrics>> {
411 self.metrics.as_ref()
412 }
413
414 pub fn with_detailed_metrics(mut self, enabled: bool) -> Self {
416 self.detailed_metrics = enabled;
417 self
418 }
419
420 pub fn with_timing(mut self, enabled: bool) -> Self {
422 self.record_timing = enabled;
423 self
424 }
425
426 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
428 self.custom_attributes.insert(key.into(), value.into());
429 self
430 }
431
432 pub fn with_attributes<I, K, V>(mut self, attributes: I) -> Self
434 where
435 I: IntoIterator<Item = (K, V)>,
436 K: Into<String>,
437 V: Into<String>,
438 {
439 for (key, value) in attributes {
440 self.custom_attributes.insert(key.into(), value.into());
441 }
442 self
443 }
444
445 #[cfg(feature = "telemetry")]
452 pub fn start_suite_span(&self, suite_name: &str, check_count: usize) -> TermSpan {
453 let mut span = self.tracer.start(format!("validation_suite.{suite_name}"));
454
455 span.set_attribute(opentelemetry::KeyValue::new(
457 "validation.suite.name",
458 suite_name.to_string(),
459 ));
460 span.set_attribute(opentelemetry::KeyValue::new(
461 "validation.suite.check_count",
462 check_count as i64,
463 ));
464 span.set_attribute(opentelemetry::KeyValue::new("validation.type", "suite"));
465
466 for (key, value) in &self.custom_attributes {
468 span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
469 }
470
471 TermSpan::new(span)
472 }
473
474 #[cfg(not(feature = "telemetry"))]
476 pub fn start_suite_span(&self, _suite_name: &str, _check_count: usize) -> TermSpan {
477 TermSpan::noop()
478 }
479
480 #[cfg(feature = "telemetry")]
482 pub fn start_check_span(&self, check_name: &str, constraint_count: usize) -> TermSpan {
483 let mut span = self.tracer.start(format!("validation_check.{check_name}"));
484
485 span.set_attribute(opentelemetry::KeyValue::new(
486 "validation.check.name",
487 check_name.to_string(),
488 ));
489 span.set_attribute(opentelemetry::KeyValue::new(
490 "validation.check.constraint_count",
491 constraint_count as i64,
492 ));
493 span.set_attribute(opentelemetry::KeyValue::new("validation.type", "check"));
494
495 for (key, value) in &self.custom_attributes {
496 span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
497 }
498
499 TermSpan::new(span)
500 }
501
502 #[cfg(not(feature = "telemetry"))]
503 pub fn start_check_span(&self, _check_name: &str, _constraint_count: usize) -> TermSpan {
504 TermSpan::noop()
505 }
506
507 #[cfg(feature = "telemetry")]
509 pub fn start_constraint_span(&self, constraint_name: &str, column: Option<&str>) -> TermSpan {
510 let mut span = self
511 .tracer
512 .start(format!("validation_constraint.{constraint_name}"));
513
514 span.set_attribute(opentelemetry::KeyValue::new(
515 "validation.constraint.name",
516 constraint_name.to_string(),
517 ));
518 span.set_attribute(opentelemetry::KeyValue::new(
519 "validation.type",
520 "constraint",
521 ));
522
523 if let Some(col) = column {
524 span.set_attribute(opentelemetry::KeyValue::new(
525 "validation.constraint.column",
526 col.to_string(),
527 ));
528 }
529
530 for (key, value) in &self.custom_attributes {
531 span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
532 }
533
534 TermSpan::new(span)
535 }
536
537 #[cfg(not(feature = "telemetry"))]
538 pub fn start_constraint_span(&self, _constraint_name: &str, _column: Option<&str>) -> TermSpan {
539 TermSpan::noop()
540 }
541
542 #[cfg(feature = "telemetry")]
544 pub fn start_datasource_span(&self, source_type: &str, table_name: &str) -> TermSpan {
545 let mut span = self.tracer.start(format!("data_source.{source_type}"));
546
547 span.set_attribute(opentelemetry::KeyValue::new(
548 "data_source.type",
549 source_type.to_string(),
550 ));
551 span.set_attribute(opentelemetry::KeyValue::new(
552 "data_source.table_name",
553 table_name.to_string(),
554 ));
555 span.set_attribute(opentelemetry::KeyValue::new(
556 "validation.type",
557 "data_source",
558 ));
559
560 for (key, value) in &self.custom_attributes {
561 span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
562 }
563
564 TermSpan::new(span)
565 }
566
567 #[cfg(not(feature = "telemetry"))]
568 pub fn start_datasource_span(&self, _source_type: &str, _table_name: &str) -> TermSpan {
569 TermSpan::noop()
570 }
571}
572
573impl Clone for TermTelemetry {
575 fn clone(&self) -> Self {
576 Self {
577 #[cfg(feature = "telemetry")]
578 tracer: opentelemetry::global::tracer("noop"), #[cfg(feature = "telemetry")]
580 metrics: self.metrics.clone(), detailed_metrics: self.detailed_metrics,
582 record_timing: self.record_timing,
583 custom_attributes: self.custom_attributes.clone(),
584 }
585 }
586}
587
588pub struct TermSpan {
590 #[cfg(feature = "telemetry")]
591 span: BoxedSpan,
592
593 #[cfg(not(feature = "telemetry"))]
594 _phantom: std::marker::PhantomData<()>,
595}
596
597impl TermSpan {
598 #[cfg(feature = "telemetry")]
599 fn new(span: BoxedSpan) -> Self {
600 Self { span }
601 }
602
603 pub fn noop() -> Self {
606 Self {
607 #[cfg(feature = "telemetry")]
608 span: opentelemetry::global::tracer("noop").start("noop"),
609 #[cfg(not(feature = "telemetry"))]
610 _phantom: std::marker::PhantomData,
611 }
612 }
613
614 #[cfg(feature = "telemetry")]
616 pub fn add_event(&mut self, name: impl Into<String>, attributes: Vec<opentelemetry::KeyValue>) {
617 self.span.add_event(name.into(), attributes);
618 }
619
620 #[cfg(not(feature = "telemetry"))]
621 pub fn add_event(&mut self, _name: impl Into<String>, _attributes: Vec<()>) {
622 }
624
625 #[cfg(feature = "telemetry")]
627 pub fn set_attribute(&mut self, kv: opentelemetry::KeyValue) {
628 self.span.set_attribute(kv);
629 }
630
631 #[cfg(not(feature = "telemetry"))]
632 pub fn set_attribute(&mut self, _kv: ()) {
633 }
635
636 #[cfg(feature = "telemetry")]
638 pub fn set_status(&mut self, status: Status) {
639 self.span.set_status(status);
640 }
641
642 #[cfg(not(feature = "telemetry"))]
643 pub fn set_status(&mut self, _status: ()) {
644 }
646
647 #[cfg(feature = "telemetry")]
649 pub fn record_error(&mut self, error: &dyn std::error::Error) {
650 self.span.record_error(error);
651 self.span.set_status(Status::Error {
652 description: error.to_string().into(),
653 });
654 }
655
656 #[cfg(not(feature = "telemetry"))]
657 pub fn record_error(&mut self, _error: &dyn std::error::Error) {
658 }
660}
661
662impl Drop for TermSpan {
663 #[cfg(feature = "telemetry")]
664 fn drop(&mut self) {
665 self.span.end();
666 }
667
668 #[cfg(not(feature = "telemetry"))]
669 fn drop(&mut self) {
670 }
672}
673
674pub mod utils {
676 use super::*;
677
678 #[cfg(feature = "telemetry")]
680 pub fn record_validation_metrics(
681 span: &mut TermSpan,
682 passed: u32,
683 failed: u32,
684 skipped: u32,
685 duration_ms: u64,
686 ) {
687 span.set_attribute(opentelemetry::KeyValue::new(
688 "validation.metrics.passed",
689 passed as i64,
690 ));
691 span.set_attribute(opentelemetry::KeyValue::new(
692 "validation.metrics.failed",
693 failed as i64,
694 ));
695 span.set_attribute(opentelemetry::KeyValue::new(
696 "validation.metrics.skipped",
697 skipped as i64,
698 ));
699 span.set_attribute(opentelemetry::KeyValue::new(
700 "validation.metrics.total",
701 (passed + failed + skipped) as i64,
702 ));
703 span.set_attribute(opentelemetry::KeyValue::new(
704 "validation.duration_ms",
705 duration_ms as i64,
706 ));
707 }
708
709 #[cfg(not(feature = "telemetry"))]
710 pub fn record_validation_metrics(
711 _span: &mut TermSpan,
712 _passed: u32,
713 _failed: u32,
714 _skipped: u32,
715 _duration_ms: u64,
716 ) {
717 }
719
720 #[cfg(feature = "telemetry")]
722 pub fn record_constraint_result(span: &mut TermSpan, result: &crate::core::ConstraintResult) {
723 use crate::core::ConstraintStatus;
724
725 let status_str = match result.status {
726 ConstraintStatus::Success => "success",
727 ConstraintStatus::Failure => "failure",
728 ConstraintStatus::Skipped => "skipped",
729 };
730
731 span.set_attribute(opentelemetry::KeyValue::new(
732 "validation.constraint.status",
733 status_str,
734 ));
735
736 if let Some(metric) = result.metric {
737 span.set_attribute(opentelemetry::KeyValue::new(
738 "validation.constraint.metric",
739 metric,
740 ));
741 }
742
743 if let Some(ref message) = result.message {
744 span.set_attribute(opentelemetry::KeyValue::new(
745 "validation.constraint.message",
746 message.clone(),
747 ));
748 }
749 }
750
751 #[cfg(not(feature = "telemetry"))]
752 pub fn record_constraint_result(_span: &mut TermSpan, _result: &crate::core::ConstraintResult) {
753 }
755}
756
757#[cfg(test)]
758mod tests {
759 use super::*;
760
761 #[test]
762 fn test_disabled_telemetry() {
763 let telemetry = TermTelemetry::disabled();
764 assert!(!telemetry.detailed_metrics);
765 assert!(!telemetry.record_timing);
766 }
767
768 #[test]
769 fn test_telemetry_configuration() {
770 let telemetry = TermTelemetry::disabled()
771 .with_detailed_metrics(true)
772 .with_timing(true)
773 .with_attribute("service.name", "test_service")
774 .with_attributes([("env", "test"), ("version", "1.0.0")]);
775
776 assert!(telemetry.detailed_metrics);
777 assert!(telemetry.record_timing);
778 assert_eq!(
779 telemetry.custom_attributes.get("service.name"),
780 Some(&"test_service".to_string())
781 );
782 assert_eq!(
783 telemetry.custom_attributes.get("env"),
784 Some(&"test".to_string())
785 );
786 assert_eq!(
787 telemetry.custom_attributes.get("version"),
788 Some(&"1.0.0".to_string())
789 );
790 }
791
792 #[test]
793 fn test_noop_span_operations() {
794 let telemetry = TermTelemetry::disabled();
795 let mut span = telemetry.start_suite_span("test_suite", 5);
796
797 span.add_event("test_event", vec![]);
799 #[cfg(feature = "telemetry")]
800 span.set_attribute(opentelemetry::KeyValue::new("test_key", "test_value"));
801 span.record_error(&std::io::Error::new(
802 std::io::ErrorKind::Other,
803 "test error",
804 ));
805 }
806
807 #[cfg(feature = "telemetry")]
808 #[test]
809 fn test_telemetry_with_noop_tracer() {
810 let tracer = opentelemetry::global::tracer("test");
811 let telemetry = TermTelemetry::new(tracer);
812
813 let _suite_span = telemetry.start_suite_span("test_suite", 3);
815 let _check_span = telemetry.start_check_span("test_check", 2);
816 let _constraint_span =
817 telemetry.start_constraint_span("test_constraint", Some("test_column"));
818 let _datasource_span = telemetry.start_datasource_span("csv", "test_table");
819 }
820}