Skip to main content

krishiv_metrics/
lib.rs

1#![forbid(unsafe_code)]
2//! **Beta API**: may change between minor releases.
3//!
4//! OpenTelemetry metrics, traces, and structured log initialization for all Krishiv processes.
5
6pub mod grpc;
7pub mod observability_report;
8
9mod counters;
10mod init;
11
12pub use counters::{KrishivHistogram, KrishivMetrics, current_tracestate, global_metrics};
13pub use init::{
14    MetricsConfig, MetricsError, MetricsHandle, TracerExporter, current_traceparent, init,
15};
16
17#[cfg(test)]
18mod tests {
19    use super::{counters::*, init::*};
20
21    #[test]
22    fn init_noop_does_not_panic() {
23        let _handle = init(MetricsConfig::default()).expect("noop init should succeed");
24    }
25
26    #[test]
27    fn shutdown_does_not_panic() {
28        let handle = init(MetricsConfig::default()).expect("init");
29        handle.shutdown();
30    }
31
32    #[test]
33    fn tracing_span_does_not_panic() {
34        let _handle = init(MetricsConfig::default()).expect("init");
35        let _s = tracing::info_span!("test_span").entered();
36    }
37
38    #[test]
39    fn default_config_service_name() {
40        assert_eq!(MetricsConfig::default().service_name, "krishiv");
41    }
42
43    #[test]
44    fn default_config_otlp_endpoint_is_none() {
45        assert!(MetricsConfig::default().otlp_endpoint.is_none());
46    }
47
48    #[test]
49    fn current_traceparent_no_span_returns_none() {
50        assert_eq!(current_traceparent(), None);
51    }
52
53    #[test]
54    fn current_tracestate_no_span_returns_none() {
55        assert_eq!(current_tracestate(), None);
56    }
57
58    // ── KrishivMetrics counter/gauge increment tests ─────────────────────────
59
60    #[test]
61    fn inc_tasks_submitted_increments_by_one() {
62        let m = KrishivMetrics::default();
63        assert_eq!(m.tasks_submitted(), 0);
64        m.inc_tasks_submitted();
65        assert_eq!(m.tasks_submitted(), 1);
66        m.inc_tasks_submitted();
67        assert_eq!(m.tasks_submitted(), 2);
68    }
69
70    #[test]
71    fn set_tasks_running_stores_value() {
72        let m = KrishivMetrics::default();
73        m.set_tasks_running(5);
74        assert_eq!(m.tasks_running(), 5);
75        m.set_tasks_running(0);
76        assert_eq!(m.tasks_running(), 0);
77    }
78
79    #[test]
80    fn inc_tasks_succeeded_increments() {
81        let m = KrishivMetrics::default();
82        m.inc_tasks_succeeded();
83        m.inc_tasks_succeeded();
84        m.inc_tasks_succeeded();
85        assert_eq!(m.tasks_succeeded(), 3);
86    }
87
88    #[test]
89    fn inc_tasks_failed_increments() {
90        let m = KrishivMetrics::default();
91        m.inc_tasks_failed();
92        assert_eq!(m.tasks_failed(), 1);
93    }
94
95    /// Regression (Wave 4 — Observability & Shutdown): `inc_executor_lost`
96    /// must increment the `executor_lost` counter and the value must be
97    /// rendered as `krishiv_executor_lost_total` in the Prometheus exposition
98    /// (the counter and its renderer line were both added in this wave).
99    #[test]
100    fn inc_executor_lost_increments_and_renders() {
101        let m = KrishivMetrics::default();
102        m.inc_executor_lost();
103        m.inc_executor_lost();
104        assert_eq!(m.executor_lost(), 2);
105
106        let rendered = m.render_prometheus();
107        assert!(
108            rendered.contains("krishiv_executor_lost_total 2"),
109            "expected rendered metrics to include krishiv_executor_lost_total 2, got: {rendered}"
110        );
111    }
112
113    #[test]
114    fn add_shuffle_bytes_written_accumulates() {
115        let m = KrishivMetrics::default();
116        m.add_shuffle_bytes_written(1024);
117        m.add_shuffle_bytes_written(2048);
118        assert_eq!(m.shuffle_bytes_written(), 3072);
119    }
120
121    #[test]
122    fn set_job_queue_depth_stores_value() {
123        let m = KrishivMetrics::default();
124        m.set_job_queue_depth(42);
125        assert_eq!(m.job_queue_depth(), 42);
126    }
127
128    #[test]
129    fn global_metrics_returns_same_instance() {
130        let a = global_metrics();
131        let b = global_metrics();
132        let a_ptr = a as *const KrishivMetrics;
133        let b_ptr = b as *const KrishivMetrics;
134        assert_eq!(a_ptr, b_ptr);
135    }
136
137    // ── Prometheus text format rendering (P0 fix validation) ──────────────────
138
139    /// Verifies the P0 format fix: exactly one HELP + TYPE per metric family.
140    #[test]
141    fn render_prometheus_single_help_type_per_family() {
142        let m = KrishivMetrics::default();
143        m.inc_tasks_submitted();
144        m.inc_tasks_succeeded();
145        m.inc_tasks_failed();
146        let body = m.render_prometheus();
147        // Count HELP lines for krishiv_tasks_total — must be exactly 1.
148        let help_count = body
149            .lines()
150            .filter(|l| l.starts_with("# HELP krishiv_tasks_total"))
151            .count();
152        assert_eq!(
153            help_count, 1,
154            "must have exactly one HELP line per metric family"
155        );
156        // Count TYPE lines for krishiv_tasks_total — must be exactly 1.
157        let type_count = body
158            .lines()
159            .filter(|l| l.starts_with("# TYPE krishiv_tasks_total"))
160            .count();
161        assert_eq!(
162            type_count, 1,
163            "must have exactly one TYPE line per metric family"
164        );
165    }
166
167    #[test]
168    fn render_prometheus_contains_help_and_type_lines() {
169        let m = KrishivMetrics::default();
170        let body = m.render_prometheus();
171        assert!(body.contains("# HELP krishiv_tasks_total"));
172        assert!(body.contains("# TYPE krishiv_tasks_total counter"));
173        assert!(body.contains("# HELP krishiv_tasks_running"));
174        assert!(body.contains("# TYPE krishiv_tasks_running gauge"));
175        assert!(body.contains("# HELP krishiv_shuffle_bytes_written_total"));
176        assert!(body.contains("# TYPE krishiv_shuffle_bytes_written_total counter"));
177        assert!(body.contains("# HELP krishiv_job_queue_depth"));
178        assert!(body.contains("# TYPE krishiv_job_queue_depth gauge"));
179    }
180
181    #[test]
182    fn render_prometheus_reflects_counter_values() {
183        let m = KrishivMetrics::default();
184        m.inc_tasks_submitted();
185        m.inc_tasks_submitted();
186        m.inc_tasks_succeeded();
187        m.inc_tasks_failed();
188        let body = m.render_prometheus();
189        assert!(body.contains("krishiv_tasks_total{status=\"submitted\"} 2"));
190        assert!(body.contains("krishiv_tasks_total{status=\"succeeded\"} 1"));
191        assert!(body.contains("krishiv_tasks_total{status=\"failed\"} 1"));
192    }
193
194    #[test]
195    fn render_prometheus_reflects_gauge_values() {
196        let m = KrishivMetrics::default();
197        m.set_tasks_running(7);
198        m.set_job_queue_depth(3);
199        m.add_shuffle_bytes_written(4096);
200        let body = m.render_prometheus();
201        assert!(body.contains("krishiv_tasks_running 7"));
202        assert!(body.contains("krishiv_job_queue_depth 3"));
203        assert!(body.contains("krishiv_shuffle_bytes_written_total 4096"));
204    }
205
206    #[test]
207    fn render_prometheus_zeroes_for_default() {
208        let m = KrishivMetrics::default();
209        let body = m.render_prometheus();
210        assert!(body.contains("krishiv_tasks_total{status=\"submitted\"} 0"));
211        assert!(body.contains("krishiv_tasks_running 0"));
212        assert!(body.contains("krishiv_shuffle_bytes_written_total 0"));
213        assert!(body.contains("krishiv_job_queue_depth 0"));
214    }
215
216    #[test]
217    fn render_prometheus_ends_with_newline() {
218        let m = KrishivMetrics::default();
219        let body = m.render_prometheus();
220        assert!(body.ends_with('\n'));
221    }
222
223    // ── Labeled metric tests ─────────────────────────────────────────────────
224
225    #[test]
226    fn labeled_checkpoint_epoch_gauge() {
227        let m = KrishivMetrics::default();
228        m.set_checkpoint_epoch("job-a", 5);
229        m.set_checkpoint_epoch("job-b", 12);
230        let body = m.render_prometheus();
231        assert!(body.contains("krishiv_checkpoint_epoch{job_id=\"job-a\"} 5"));
232        assert!(body.contains("krishiv_checkpoint_epoch{job_id=\"job-b\"} 12"));
233    }
234
235    #[test]
236    fn labeled_checkpoint_epoch_counters() {
237        let m = KrishivMetrics::default();
238        m.inc_checkpoint_committed("job-a");
239        m.inc_checkpoint_committed("job-a");
240        m.inc_checkpoint_aborted("job-a");
241        m.inc_checkpoint_failed("job-b");
242        let body = m.render_prometheus();
243        assert!(
244            body.contains(
245                "krishiv_checkpoint_epochs_total{job_id=\"job-a\",status=\"committed\"} 2"
246            )
247        );
248        assert!(
249            body.contains("krishiv_checkpoint_epochs_total{job_id=\"job-a\",status=\"aborted\"} 1")
250        );
251        assert!(
252            body.contains("krishiv_checkpoint_epochs_total{job_id=\"job-b\",status=\"failed\"} 1")
253        );
254    }
255
256    #[test]
257    fn labeled_watermark_gauge() {
258        let m = KrishivMetrics::default();
259        m.set_watermark_ms("stream-job", 1620000000000);
260        let body = m.render_prometheus();
261        assert!(body.contains("krishiv_watermark_ms{job_id=\"stream-job\"} 1620000000000"));
262    }
263
264    #[test]
265    fn labeled_latency_histograms() {
266        let m = KrishivMetrics::default();
267        m.observe_grpc_duration("/krishiv.ExecutorTaskService/LaunchTask", 0.15);
268        m.observe_grpc_duration("/krishiv.ExecutorTaskService/LaunchTask", 0.002);
269
270        m.observe_checkpoint_commit_duration("write_manifest", 0.035);
271        m.observe_checkpoint_commit_duration("fsync", 1.2);
272
273        let body = m.render_prometheus();
274
275        // Verify gRPC call duration histogram
276        assert!(body.contains("krishiv_grpc_call_duration_seconds_count{path=\"/krishiv.ExecutorTaskService/LaunchTask\"} 2"));
277        assert!(body.contains("krishiv_grpc_call_duration_seconds_sum{path=\"/krishiv.ExecutorTaskService/LaunchTask\"} 0.152"));
278        assert!(body.contains("krishiv_grpc_call_duration_seconds_bucket{path=\"/krishiv.ExecutorTaskService/LaunchTask\",le=\"0.005\"} 1"));
279        assert!(body.contains("krishiv_grpc_call_duration_seconds_bucket{path=\"/krishiv.ExecutorTaskService/LaunchTask\",le=\"0.25\"} 2"));
280        assert!(body.contains("krishiv_grpc_call_duration_seconds_bucket{path=\"/krishiv.ExecutorTaskService/LaunchTask\",le=\"+Inf\"} 2"));
281
282        // Verify checkpoint commit duration histogram
283        assert!(body.contains(
284            "krishiv_checkpoint_commit_duration_seconds_count{phase=\"write_manifest\"} 1"
285        ));
286        assert!(body.contains(
287            "krishiv_checkpoint_commit_duration_seconds_sum{phase=\"write_manifest\"} 0.035"
288        ));
289        assert!(body.contains("krishiv_checkpoint_commit_duration_seconds_bucket{phase=\"write_manifest\",le=\"0.05\"} 1"));
290
291        assert!(
292            body.contains("krishiv_checkpoint_commit_duration_seconds_count{phase=\"fsync\"} 1")
293        );
294        assert!(
295            body.contains("krishiv_checkpoint_commit_duration_seconds_sum{phase=\"fsync\"} 1.200")
296        );
297        assert!(body.contains(
298            "krishiv_checkpoint_commit_duration_seconds_bucket{phase=\"fsync\",le=\"2.5\"} 1"
299        ));
300    }
301
302    #[test]
303    fn labeled_source_offset_lag() {
304        let m = KrishivMetrics::default();
305        m.set_source_offset_lag("job-a", "kafka-topic-0", 1500);
306        let body = m.render_prometheus();
307        assert!(body.contains(
308            "krishiv_source_offset_lag{job_id=\"job-a\",source_id=\"kafka-topic-0\"} 1500"
309        ));
310    }
311
312    #[test]
313    fn labeled_task_attempt_counters() {
314        let m = KrishivMetrics::default();
315        m.inc_task_attempt_submitted("job-a", "stage-0");
316        m.inc_task_attempt_submitted("job-a", "stage-0");
317        m.inc_task_attempt_succeeded("job-a", "stage-0");
318        m.inc_task_attempt_failed("job-a", "stage-0");
319        m.inc_task_attempt_retrying("job-a", "stage-0");
320        let body = m.render_prometheus();
321        assert!(body.contains(
322            "krishiv_task_attempts_total{job_id=\"job-a\",stage_id=\"stage-0\",status=\"submitted\"} 2"
323        ));
324        assert!(body.contains(
325            "krishiv_task_attempts_total{job_id=\"job-a\",stage_id=\"stage-0\",status=\"succeeded\"} 1"
326        ));
327        assert!(body.contains(
328            "krishiv_task_attempts_total{job_id=\"job-a\",stage_id=\"stage-0\",status=\"failed\"} 1"
329        ));
330    }
331
332    #[test]
333    fn labeled_executor_slots_gauge() {
334        let m = KrishivMetrics::default();
335        m.set_executor_slots_used("exec-1", 3);
336        m.set_executor_slots_used("exec-2", 7);
337        let body = m.render_prometheus();
338        assert!(body.contains("krishiv_executor_slots_used{executor_id=\"exec-1\"} 3"));
339        assert!(body.contains("krishiv_executor_slots_used{executor_id=\"exec-2\"} 7"));
340    }
341
342    #[test]
343    fn labeled_streaming_rows_counter() {
344        let m = KrishivMetrics::default();
345        m.add_streaming_rows("job-a", "task-0", 100);
346        m.add_streaming_rows("job-a", "task-0", 250);
347        let body = m.render_prometheus();
348        assert!(body.contains(
349            "krishiv_streaming_rows_emitted_total{job_id=\"job-a\",task_id=\"task-0\"} 350"
350        ));
351    }
352
353    #[test]
354    fn labeled_state_backend_gauges() {
355        let m = KrishivMetrics::default();
356        m.set_state_key_count("job-a", 5000);
357        m.set_state_bytes("job-a", 1048576);
358        let body = m.render_prometheus();
359        assert!(body.contains("krishiv_state_key_count{job_id=\"job-a\"} 5000"));
360        assert!(body.contains("krishiv_state_bytes{job_id=\"job-a\"} 1048576"));
361    }
362
363    #[test]
364    fn labeled_shuffle_partition_gauges() {
365        let m = KrishivMetrics::default();
366        m.set_shuffle_partitions("job-a", "stage-1", 3, 7, 1);
367        let body = m.render_prometheus();
368        assert!(body.contains(
369            "krishiv_shuffle_partitions{job_id=\"job-a\",stage_id=\"stage-1\",state=\"pending\"} 3"
370        ));
371        assert!(body.contains(
372            "krishiv_shuffle_partitions{job_id=\"job-a\",stage_id=\"stage-1\",state=\"available\"} 7"
373        ));
374        assert!(body.contains(
375            "krishiv_shuffle_partitions{job_id=\"job-a\",stage_id=\"stage-1\",state=\"failed\"} 1"
376        ));
377    }
378
379    #[test]
380    fn remove_job_cleans_all_labeled_metrics() {
381        let m = KrishivMetrics::default();
382        m.set_checkpoint_epoch("job-a", 1);
383        m.set_watermark_ms("job-a", 1000);
384        m.inc_checkpoint_committed("job-a");
385        m.inc_task_attempt_submitted("job-a", "stage-0");
386        m.set_shuffle_partitions("job-a", "stage-1", 1, 0, 0);
387        m.set_state_key_count("job-a", 42);
388        m.set_state_bytes("job-a", 1024);
389        m.set_source_offset_lag("job-a", "kafka-0", 99);
390        m.add_streaming_rows("job-a", "task-0", 10);
391        m.remove_job("job-a");
392
393        let body = m.render_prometheus();
394        assert!(!body.contains("job-a"), "no job-a metrics after remove");
395        // Global metrics should still exist.
396        assert!(body.contains("krishiv_tasks_total"));
397    }
398
399    // ── traceparent / tracestate generation ──────────────────────────────────
400
401    // ── MetricsError Display ────────────────────────────────────────────────
402
403    #[test]
404    fn metrics_error_display_otlp_build() {
405        let err = MetricsError::OtlpBuild("connection refused".into());
406        assert_eq!(
407            err.to_string(),
408            "OTLP exporter build failed: connection refused"
409        );
410    }
411
412    #[test]
413    fn metrics_error_display_subscriber() {
414        let err = MetricsError::Subscriber("already set".into());
415        assert_eq!(err.to_string(), "subscriber init failed: already set");
416    }
417
418    #[test]
419    fn metrics_error_is_std_error() {
420        let err: Box<dyn std::error::Error> = Box::new(MetricsError::OtlpBuild("test".into()));
421        assert!(!err.to_string().is_empty());
422    }
423
424    // ── MetricsConfig custom values ─────────────────────────────────────────
425
426    #[test]
427    fn metrics_config_custom_service_name() {
428        let config = MetricsConfig {
429            service_name: "my-service".into(),
430            ..Default::default()
431        };
432        assert_eq!(config.service_name, "my-service");
433    }
434
435    #[test]
436    fn metrics_config_custom_log_filter() {
437        let config = MetricsConfig {
438            log_filter: Some("debug".into()),
439            ..Default::default()
440        };
441        assert_eq!(config.log_filter.as_deref(), Some("debug"));
442    }
443
444    #[test]
445    fn metrics_config_stdout_exporter() {
446        let config = MetricsConfig {
447            exporter: TracerExporter::Stdout,
448            ..Default::default()
449        };
450        assert!(matches!(config.exporter, TracerExporter::Stdout));
451    }
452
453    #[test]
454    fn metrics_config_otlp_endpoint_some() {
455        let config = MetricsConfig {
456            otlp_endpoint: Some("http://localhost:4317".into()),
457            ..Default::default()
458        };
459        assert_eq!(
460            config.otlp_endpoint.as_deref(),
461            Some("http://localhost:4317")
462        );
463    }
464
465    // ── MetricsHandle noop and shutdown ──────────────────────────────────────
466
467    #[test]
468    fn metrics_handle_noop_creates_valid_handle() {
469        let handle = MetricsHandle::noop();
470        drop(handle);
471    }
472
473    #[test]
474    fn metrics_handle_drop_calls_shutdown() {
475        let handle = init(MetricsConfig::default()).expect("init");
476        drop(handle);
477    }
478
479    #[test]
480    fn init_with_stdout_exporter() {
481        let config = MetricsConfig {
482            exporter: TracerExporter::Stdout,
483            ..Default::default()
484        };
485        let handle = init(config);
486        assert!(handle.is_ok());
487    }
488
489    #[test]
490    fn init_with_custom_filter() {
491        let config = MetricsConfig {
492            log_filter: Some("warn".into()),
493            ..Default::default()
494        };
495        let handle = init(config);
496        assert!(handle.is_ok());
497    }
498
499    #[test]
500    fn init_with_empty_filter_defaults_to_info() {
501        let config = MetricsConfig {
502            log_filter: Some("".into()),
503            ..Default::default()
504        };
505        let _handle = init(config);
506    }
507
508    // ── KrishivMetrics edge cases ───────────────────────────────────────────
509
510    #[test]
511    fn add_shuffle_bytes_written_zero() {
512        let m = KrishivMetrics::default();
513        m.add_shuffle_bytes_written(0);
514        assert_eq!(m.shuffle_bytes_written(), 0);
515    }
516
517    #[test]
518    fn add_shuffle_bytes_written_max_value() {
519        let m = KrishivMetrics::default();
520        m.add_shuffle_bytes_written(u64::MAX);
521        assert_eq!(m.shuffle_bytes_written(), u64::MAX);
522    }
523
524    #[test]
525    fn set_tasks_running_max_value() {
526        let m = KrishivMetrics::default();
527        m.set_tasks_running(u64::MAX);
528        assert_eq!(m.tasks_running(), u64::MAX);
529    }
530
531    #[test]
532    fn set_job_queue_depth_zero() {
533        let m = KrishivMetrics::default();
534        m.set_job_queue_depth(42);
535        m.set_job_queue_depth(0);
536        assert_eq!(m.job_queue_depth(), 0);
537    }
538
539    #[test]
540    fn multiple_counters_accumulate_independently() {
541        let m = KrishivMetrics::default();
542        for _ in 0..100 {
543            m.inc_tasks_submitted();
544        }
545        for _ in 0..50 {
546            m.inc_tasks_succeeded();
547        }
548        for _ in 0..10 {
549            m.inc_tasks_failed();
550        }
551        assert_eq!(m.tasks_submitted(), 100);
552        assert_eq!(m.tasks_succeeded(), 50);
553        assert_eq!(m.tasks_failed(), 10);
554    }
555
556    #[test]
557    fn prometheus_output_is_valid_utf8() {
558        let m = KrishivMetrics::default();
559        m.inc_tasks_submitted();
560        let body = m.render_prometheus();
561        assert!(std::str::from_utf8(body.as_bytes()).is_ok());
562    }
563
564    // ── Global metrics thread safety ────────────────────────────────────────
565
566    #[test]
567    fn global_metrics_thread_safety() {
568        use std::sync::Arc;
569        use std::thread;
570
571        let metrics = Arc::new(KrishivMetrics::default());
572        let handles: Vec<_> = (0..10)
573            .map(|_| {
574                let m = Arc::clone(&metrics);
575                thread::spawn(move || {
576                    for _ in 0..1000 {
577                        m.inc_tasks_submitted();
578                    }
579                })
580            })
581            .collect();
582        for h in handles {
583            h.join().unwrap();
584        }
585        assert_eq!(metrics.tasks_submitted(), 10000);
586    }
587
588    /// Verify that labeled metrics are thread-safe (DashMap + AtomicU64).
589    #[test]
590    fn labeled_metrics_thread_safety() {
591        use std::sync::Arc;
592        use std::thread;
593
594        let metrics = Arc::new(KrishivMetrics::default());
595        let handles: Vec<_> = (0..10)
596            .map(|i| {
597                let m = Arc::clone(&metrics);
598                thread::spawn(move || {
599                    for _ in 0..500 {
600                        m.inc_task_attempt_submitted(&format!("job-{i}"), "stage-0");
601                        m.set_checkpoint_epoch(&format!("job-{i}"), 1);
602                    }
603                })
604            })
605            .collect();
606        for h in handles {
607            h.join().unwrap();
608        }
609        // Verify no crash — concurrent DashMap access should work.
610        let body = metrics.render_prometheus();
611        assert!(body.contains("krishiv_checkpoint_epoch"));
612    }
613
614    // ── deployment_target unit tests ────────────────────────────────────────
615
616    #[test]
617    fn resolved_deployment_target_explicit_config() {
618        let config = MetricsConfig {
619            deployment_target: Some("production".into()),
620            ..MetricsConfig::default()
621        };
622        assert_eq!(config.resolved_deployment_target(), "production");
623    }
624
625    #[test]
626    fn resolved_deployment_target_none_returns_env_or_unknown() {
627        // When no explicit config is given, the function reads the env var or
628        // falls back to "unknown". We verify the documented fallback chain
629        // without mutating the environment (unsafe_code is workspace-forbidden).
630        let config = MetricsConfig {
631            deployment_target: None,
632            ..MetricsConfig::default()
633        };
634        let result = config.resolved_deployment_target();
635        let expected =
636            std::env::var("KRISHIV_DEPLOYMENT_TARGET").unwrap_or_else(|_| "unknown".to_string());
637        assert_eq!(
638            result, expected,
639            "resolved value must match the env var when set, or 'unknown' when absent"
640        );
641    }
642
643    #[test]
644    fn resolved_deployment_target_explicit_beats_any_env() {
645        // When deployment_target is explicitly set, it wins regardless of any
646        // env var — no env mutation needed to test this invariant.
647        let config = MetricsConfig {
648            deployment_target: Some("explicit-wins".into()),
649            ..MetricsConfig::default()
650        };
651        assert_eq!(
652            config.resolved_deployment_target(),
653            "explicit-wins",
654            "explicit config must always override the env var fallback"
655        );
656    }
657
658    #[test]
659    fn inmemory_exporter_captures_spans_after_init() {
660        // Verifies that TracerExporter::InMemory is correctly wired into init()
661        // and that emitted spans reach the exporter's capture buffer.
662        use opentelemetry::trace::Tracer as _;
663        use opentelemetry_sdk::trace::InMemorySpanExporter;
664
665        let exporter = InMemorySpanExporter::default();
666        let config = MetricsConfig {
667            service_name: "span-capture-test".into(),
668            exporter: TracerExporter::InMemory(exporter.clone()),
669            deployment_target: Some("test-cluster".into()),
670            otlp_endpoint: None,
671            log_filter: None,
672        };
673        let handle = init(config).expect("init must succeed with InMemory exporter");
674
675        // Emit a span directly via the provider-local tracer rather than the
676        // global one, which can be replaced by concurrent tests calling init().
677        {
678            use opentelemetry::trace::TracerProvider as _;
679            let tracer = handle.tracer_provider().tracer("capture-test");
680            let span = tracer.start("test-capture-span");
681            drop(span);
682        }
683
684        // Force flush to drain the processor (retry briefly for parallel test runs).
685        let mut spans = Vec::new();
686        for _ in 0..50 {
687            let _ = handle.tracer_provider().force_flush();
688            if let Ok(captured) = exporter.get_finished_spans()
689                && !captured.is_empty()
690            {
691                spans = captured;
692                break;
693            }
694            std::thread::sleep(std::time::Duration::from_millis(10));
695        }
696        let _ = handle.tracer_provider().shutdown();
697        if spans.is_empty()
698            && let Ok(captured) = exporter.get_finished_spans()
699        {
700            spans = captured;
701        }
702        assert!(
703            !spans.is_empty(),
704            "at least one span must be captured by InMemory exporter after init()"
705        );
706        // The deployment.target is passed to the resource builder in init().
707        // Its correctness is validated by the resolved_deployment_target unit tests.
708        // Here we just verify the span name is preserved.
709        assert!(
710            spans.iter().any(|s| s.name.as_ref() == "test-capture-span"),
711            "captured span must have the expected name"
712        );
713    }
714
715    #[tokio::test]
716    #[ignore = "requires live OTLP collector at OTEL_EXPORTER_OTLP_ENDPOINT"]
717    async fn otlp_integration_exports_span() {
718        let endpoint = match std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") {
719            Ok(e) => e,
720            Err(_) => return,
721        };
722        let config = MetricsConfig {
723            service_name: "krishiv-test".into(),
724            otlp_endpoint: Some(endpoint),
725            ..Default::default()
726        };
727        let handle = init(config).expect("metrics init with OTLP endpoint failed");
728        let tracer = opentelemetry::global::tracer("test");
729        {
730            use opentelemetry::trace::Tracer as _;
731            let _span = tracer.start("otlp_integration_test_span");
732        }
733        handle.shutdown();
734    }
735}