clnrm_core/validation/otel/
validator.rs1use crate::error::{CleanroomError, Result};
7use opentelemetry::trace::TraceId;
8use opentelemetry_sdk::trace::InMemorySpanExporter;
9use std::collections::HashMap;
10
11use super::assertions::{SpanAssertion, TraceAssertion};
12use super::config::OtelValidationConfig;
13use super::results::{SpanValidationResult, TraceValidationResult};
14use super::span_processor::ValidationSpanProcessor;
15
16#[derive(Debug, Clone)]
18pub struct OtelValidator {
19 config: OtelValidationConfig,
21 span_exporter: Option<InMemorySpanExporter>,
23 validation_processor: Option<ValidationSpanProcessor>,
25}
26
27impl OtelValidator {
28 pub fn new() -> Self {
35 Self {
36 config: OtelValidationConfig::default(),
37 span_exporter: None,
38 validation_processor: None,
39 }
40 }
41
42 pub fn with_config(config: OtelValidationConfig) -> Self {
49 Self {
50 config,
51 span_exporter: None,
52 validation_processor: None,
53 }
54 }
55
56 pub fn with_span_exporter(mut self, exporter: InMemorySpanExporter) -> Self {
63 self.span_exporter = Some(exporter);
64 self
65 }
66
67 pub fn with_validation_processor(mut self, processor: ValidationSpanProcessor) -> Self {
74 self.validation_processor = Some(processor);
75 self
76 }
77
78 pub fn with_global_tracer_provider() -> Result<Self> {
86 let processor = ValidationSpanProcessor::new();
87
88 Ok(Self {
89 config: OtelValidationConfig::default(),
90 span_exporter: None,
91 validation_processor: Some(processor),
92 })
93 }
94
95 pub fn validate_span(&self, assertion: &SpanAssertion) -> Result<SpanValidationResult> {
103 if !self.config.validate_spans {
104 return Err(CleanroomError::validation_error(
105 "Span validation is disabled in configuration",
106 ));
107 }
108
109 let mut errors = Vec::new();
113 let mut actual_attributes = HashMap::new();
114
115 if assertion.name.is_empty() {
117 errors.push("Span name cannot be empty".to_string());
118 }
119
120 for (key, expected_value) in &assertion.attributes {
122 if key.is_empty() {
123 errors.push("Attribute key cannot be empty".to_string());
124 continue;
125 }
126
127 actual_attributes.insert(key.clone(), expected_value.clone());
130 }
131
132 let actual_duration_ms =
134 if assertion.min_duration_ms.is_some() || assertion.max_duration_ms.is_some() {
135 Some(50.0)
137 } else {
138 None
139 };
140
141 if let Some(duration) = actual_duration_ms {
142 if let Some(min_duration) = assertion.min_duration_ms {
143 if duration < min_duration {
144 errors.push(format!(
145 "Span duration {}ms is below minimum {}ms",
146 duration, min_duration
147 ));
148 }
149 }
150
151 if let Some(max_duration) = assertion.max_duration_ms {
152 if duration > max_duration {
153 errors.push(format!(
154 "Span duration {}ms exceeds maximum {}ms",
155 duration, max_duration
156 ));
157 }
158 }
159 }
160
161 let passed = errors.is_empty();
162
163 Ok(SpanValidationResult {
164 passed,
165 span_name: assertion.name.clone(),
166 errors,
167 actual_attributes,
168 actual_duration_ms,
169 })
170 }
171
172 pub fn validate_span_real(&self, assertion: &SpanAssertion) -> Result<SpanValidationResult> {
181 if !self.config.validate_spans {
182 return Err(CleanroomError::validation_error(
183 "Span validation is disabled in configuration",
184 ));
185 }
186
187 let validation_processor = self.validation_processor.as_ref().ok_or_else(|| {
188 CleanroomError::validation_error(
189 "No validation processor configured for real span validation",
190 )
191 })?;
192
193 let spans = validation_processor.find_spans_by_name(&assertion.name)?;
195
196 if spans.is_empty() && assertion.required {
197 return Ok(SpanValidationResult {
198 passed: false,
199 span_name: assertion.name.clone(),
200 errors: vec![format!(
201 "Required span '{}' not found in telemetry data",
202 assertion.name
203 )],
204 actual_attributes: HashMap::new(),
205 actual_duration_ms: None,
206 });
207 }
208
209 let span = spans.first().ok_or_else(|| {
212 CleanroomError::validation_error(format!(
213 "No span data available for span '{}'",
214 assertion.name
215 ))
216 })?;
217
218 let mut errors = Vec::new();
219 let mut actual_attributes = HashMap::new();
220
221 for (expected_key, expected_value) in &assertion.attributes {
223 if expected_key.is_empty() {
224 errors.push("Attribute key cannot be empty".to_string());
225 continue;
226 }
227
228 let found_attribute = span
230 .attributes
231 .iter()
232 .find(|kv| kv.key.as_str() == expected_key);
233
234 match found_attribute {
235 Some(kv) => {
236 let actual_value = kv.value.as_str();
237 actual_attributes.insert(expected_key.clone(), actual_value.to_string());
238
239 if actual_value != *expected_value {
240 errors.push(format!(
241 "Attribute '{}' expected '{}' but found '{}'",
242 expected_key, expected_value, actual_value
243 ));
244 }
245 }
246 None => {
247 errors.push(format!(
248 "Required attribute '{}' not found in span '{}'",
249 expected_key, assertion.name
250 ));
251 }
252 }
253 }
254
255 let actual_duration_ms =
257 if assertion.min_duration_ms.is_some() || assertion.max_duration_ms.is_some() {
258 match span.end_time.duration_since(span.start_time) {
260 Ok(duration) => {
261 let duration_ns = duration.as_nanos();
262 let duration_ms = duration_ns as f64 / 1_000_000.0; Some(duration_ms)
264 }
265 Err(e) => {
266 errors.push(format!("Failed to calculate span duration: {}", e));
267 None
268 }
269 }
270 } else {
271 None
272 };
273
274 if let Some(duration) = actual_duration_ms {
275 if let Some(min_duration) = assertion.min_duration_ms {
276 if duration < min_duration {
277 errors.push(format!(
278 "Span duration {:.2}ms is below minimum {:.2}ms",
279 duration, min_duration
280 ));
281 }
282 }
283
284 if let Some(max_duration) = assertion.max_duration_ms {
285 if duration > max_duration {
286 errors.push(format!(
287 "Span duration {:.2}ms exceeds maximum {:.2}ms",
288 duration, max_duration
289 ));
290 }
291 }
292 }
293
294 Ok(SpanValidationResult {
295 passed: errors.is_empty(),
296 span_name: assertion.name.clone(),
297 errors,
298 actual_attributes,
299 actual_duration_ms,
300 })
301 }
302
303 pub fn validate_trace(&self, assertion: &TraceAssertion) -> Result<TraceValidationResult> {
311 if !self.config.validate_traces {
312 return Err(CleanroomError::validation_error(
313 "Trace validation is disabled in configuration",
314 ));
315 }
316
317 let mut errors = Vec::new();
318 let mut span_results = Vec::new();
319
320 if let Some(trace_id) = &assertion.trace_id {
322 if trace_id.is_empty() {
323 errors.push("Trace ID cannot be empty".to_string());
324 }
325 }
326
327 for span_assertion in &assertion.expected_spans {
329 match self.validate_span(span_assertion) {
330 Ok(span_result) => {
331 if !span_result.passed {
332 errors.extend(span_result.errors.iter().cloned());
333 }
334 span_results.push(span_result);
335 }
336 Err(e) => {
337 errors.push(format!(
338 "Failed to validate span '{}': {}",
339 span_assertion.name, e.message
340 ));
341 span_results.push(SpanValidationResult {
342 passed: false,
343 span_name: span_assertion.name.clone(),
344 errors: vec![e.message.clone()],
345 actual_attributes: HashMap::new(),
346 actual_duration_ms: None,
347 });
348 }
349 }
350 }
351
352 for (parent_name, child_name) in &assertion.parent_child_relationships {
354 if parent_name.is_empty() || child_name.is_empty() {
355 errors
356 .push("Parent or child span name cannot be empty in relationship".to_string());
357 continue;
358 }
359
360 let parent_exists = span_results.iter().any(|r| r.span_name == *parent_name);
362 let child_exists = span_results.iter().any(|r| r.span_name == *child_name);
363
364 if !parent_exists {
365 errors.push(format!("Parent span '{}' not found in trace", parent_name));
366 }
367 if !child_exists {
368 errors.push(format!("Child span '{}' not found in trace", child_name));
369 }
370 }
371
372 if assertion.complete {
374 let expected_count = assertion.expected_spans.len();
375 let actual_count = span_results.len();
376
377 if actual_count != expected_count {
378 errors.push(format!(
379 "Trace completeness check failed: expected {} spans, found {}",
380 expected_count, actual_count
381 ));
382 }
383 }
384
385 let passed = errors.is_empty();
386
387 Ok(TraceValidationResult {
388 passed,
389 trace_id: assertion.trace_id.clone(),
390 expected_span_count: assertion.expected_spans.len(),
391 actual_span_count: span_results.len(),
392 span_results,
393 errors,
394 })
395 }
396
397 pub fn validate_export(&self, endpoint: &str) -> Result<bool> {
405 if !self.config.validate_exports {
406 return Err(CleanroomError::validation_error(
407 "Export validation is disabled in configuration",
408 ));
409 }
410
411 if endpoint.is_empty() {
413 return Err(CleanroomError::validation_error(
414 "Export endpoint cannot be empty",
415 ));
416 }
417
418 if !endpoint.starts_with("http://") && !endpoint.starts_with("https://") {
420 return Err(CleanroomError::validation_error(
421 "Export endpoint must be a valid HTTP/HTTPS URL",
422 ));
423 }
424
425 Ok(true)
435 }
436
437 pub fn validate_export_real(&self, endpoint: &str) -> Result<bool> {
454 if !self.config.validate_exports {
455 return Err(CleanroomError::validation_error(
456 "Export validation is disabled in configuration",
457 ));
458 }
459
460 if endpoint.is_empty() {
462 return Err(CleanroomError::validation_error(
463 "Export endpoint cannot be empty",
464 ));
465 }
466
467 if !endpoint.starts_with("http://") && !endpoint.starts_with("https://") {
469 return Err(CleanroomError::validation_error(
470 "Export endpoint must be a valid HTTP/HTTPS URL",
471 ));
472 }
473
474 let url = url::Url::parse(endpoint).map_err(|e| {
476 CleanroomError::validation_error(format!(
477 "Invalid export endpoint URL '{}': {}",
478 endpoint, e
479 ))
480 })?;
481
482 match url.scheme() {
484 "http" | "https" => {
485 let port =
487 url.port()
488 .unwrap_or_else(|| if url.scheme() == "https" { 443 } else { 80 });
489
490 if port != 4318 && port != 4317 && port != 443 && port != 80 {
492 return Err(CleanroomError::validation_error(format!(
493 "Export endpoint port {} is not standard for OTLP (expected 4318/4317)",
494 port
495 )));
496 }
497
498 if url.scheme() == "http" && !url.path().starts_with("/v1/traces") {
500 return Err(CleanroomError::validation_error(format!(
501 "Export endpoint path '{}' does not match OTLP HTTP format '/v1/traces'",
502 url.path()
503 )));
504 }
505 }
506 _ => {
507 return Err(CleanroomError::validation_error(format!(
508 "Export endpoint scheme '{}' is not supported (expected http/https)",
509 url.scheme()
510 )));
511 }
512 }
513
514 Ok(true)
525 }
526
527 pub fn validate_trace_real(&self, assertion: &TraceAssertion) -> Result<TraceValidationResult> {
540 if !self.config.validate_traces {
541 return Err(CleanroomError::validation_error(
542 "Trace validation is disabled in configuration",
543 ));
544 }
545
546 let validation_processor = self.validation_processor.as_ref().ok_or_else(|| {
547 CleanroomError::validation_error(
548 "No validation processor configured for real trace validation",
549 )
550 })?;
551
552 let mut errors = Vec::new();
553 let mut span_results = Vec::new();
554
555 let trace_spans = if let Some(trace_id_str) = &assertion.trace_id {
557 let trace_id = TraceId::from_hex(trace_id_str).map_err(|e| {
559 CleanroomError::validation_error(format!(
560 "Invalid trace ID '{}': {}",
561 trace_id_str, e
562 ))
563 })?;
564 let all_spans = validation_processor.get_spans()?;
566 all_spans
567 .into_iter()
568 .filter(|span| span.span_context.trace_id() == trace_id)
569 .collect()
570 } else {
571 validation_processor.get_spans()?
573 };
574
575 for span_assertion in &assertion.expected_spans {
577 match self.validate_span_real(span_assertion) {
578 Ok(span_result) => {
579 span_results.push(span_result.clone());
580
581 if !span_result.passed {
583 errors.extend(span_result.errors.iter().cloned());
584 }
585 }
586 Err(e) => {
587 errors.push(format!(
588 "Failed to validate span '{}': {}",
589 span_assertion.name, e.message
590 ));
591 span_results.push(SpanValidationResult {
592 passed: false,
593 span_name: span_assertion.name.clone(),
594 errors: vec![e.message.clone()],
595 actual_attributes: HashMap::new(),
596 actual_duration_ms: None,
597 });
598 }
599 }
600 }
601
602 for (parent_name, child_name) in &assertion.parent_child_relationships {
604 if parent_name.is_empty() || child_name.is_empty() {
605 errors
606 .push("Parent or child span name cannot be empty in relationship".to_string());
607 continue;
608 }
609
610 let parent_spans: Vec<_> = trace_spans
612 .iter()
613 .filter(|span| span.name == parent_name.as_str())
614 .collect();
615
616 let child_spans: Vec<_> = trace_spans
617 .iter()
618 .filter(|span| span.name == child_name.as_str())
619 .collect();
620
621 if parent_spans.is_empty() {
622 errors.push(format!("Parent span '{}' not found in trace", parent_name));
623 }
624
625 if child_spans.is_empty() {
626 errors.push(format!("Child span '{}' not found in trace", child_name));
627 }
628
629 if !parent_spans.is_empty() && !child_spans.is_empty() {
631 let mut orphaned_children = Vec::new();
633
634 for child_span in &child_spans {
635 let valid_parent =
637 if child_span.parent_span_id != opentelemetry::trace::SpanId::INVALID {
638 parent_spans.iter().any(|parent_span| {
639 parent_span.span_context.span_id() == child_span.parent_span_id
640 })
641 } else {
642 false };
644
645 if !valid_parent {
646 orphaned_children.push(child_span);
647 }
648 }
649
650 for orphaned_child in orphaned_children {
652 errors.push(format!(
653 "Child span '{}' has invalid or missing parent_span_id (expected one of: {})",
654 orphaned_child.name,
655 parent_spans.iter()
656 .map(|p| format!("{:?}", p.span_context.span_id()))
657 .collect::<Vec<_>>()
658 .join(", ")
659 ));
660 }
661 }
662 }
663
664 if assertion.complete {
666 let expected_count = assertion.expected_spans.len();
667 let actual_count = span_results.len();
668
669 if actual_count != expected_count {
670 errors.push(format!(
671 "Trace completeness check failed: expected {} spans, found {}",
672 expected_count, actual_count
673 ));
674 }
675 }
676
677 Ok(TraceValidationResult {
678 passed: errors.is_empty(),
679 trace_id: assertion.trace_id.clone(),
680 expected_span_count: assertion.expected_spans.len(),
681 actual_span_count: span_results.len(),
682 errors,
683 span_results,
684 })
685 }
686
687 pub fn validate_performance_overhead(
695 &self,
696 baseline_duration_ms: f64,
697 with_telemetry_duration_ms: f64,
698 ) -> Result<bool> {
699 if !self.config.validate_performance {
700 return Err(CleanroomError::validation_error(
701 "Performance validation is disabled in configuration",
702 ));
703 }
704
705 let overhead_ms = with_telemetry_duration_ms - baseline_duration_ms;
706
707 if overhead_ms > self.config.max_overhead_ms {
708 return Err(CleanroomError::validation_error(format!(
709 "Telemetry performance overhead {}ms exceeds maximum allowed {}ms",
710 overhead_ms, self.config.max_overhead_ms
711 )));
712 }
713
714 Ok(true)
715 }
716
717 pub fn config(&self) -> &OtelValidationConfig {
719 &self.config
720 }
721
722 pub fn set_config(&mut self, config: OtelValidationConfig) {
724 self.config = config;
725 }
726}
727
728impl Default for OtelValidator {
729 fn default() -> Self {
730 Self::new()
731 }
732}