Skip to main content

fast_telemetry/export/
otlp.rs

1//! OTLP (OpenTelemetry Protocol) export for fast-telemetry metrics.
2//!
3//! Converts fast-telemetry metric types into OTLP protobuf messages for export
4//! via HTTP/protobuf to any OTLP-compatible collector.
5//!
6//! All exports use **cumulative temporality** — values represent running totals
7//! since process start. No state tracking is required between export cycles.
8
9use crate::exp_buckets::ExpBucketsSnapshot;
10use crate::{
11    Counter, Distribution, DynamicCounter, DynamicDistribution, DynamicGauge, DynamicGaugeI64,
12    DynamicHistogram, Gauge, GaugeF64, Histogram, LabelEnum, LabeledCounter, LabeledGauge,
13    LabeledHistogram,
14};
15
16/// Re-export proto types so downstream crates (and the derive macro) can reference them.
17pub mod pb {
18    pub use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
19    pub use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
20    pub use opentelemetry_proto::tonic::common::v1::{
21        AnyValue, InstrumentationScope, KeyValue, any_value,
22    };
23    pub use opentelemetry_proto::tonic::metrics::v1::{
24        self, AggregationTemporality, ExponentialHistogram as OtlpExpHistogram,
25        ExponentialHistogramDataPoint, Gauge as OtlpGauge, Histogram as OtlpHistogram,
26        HistogramDataPoint, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum,
27        exponential_histogram_data_point, metric, number_data_point,
28    };
29    pub use opentelemetry_proto::tonic::resource::v1::Resource;
30    pub use opentelemetry_proto::tonic::trace::v1::{
31        ResourceSpans, ScopeSpans, Span as OtlpSpan, Status as OtlpStatus,
32        span::{Event as OtlpEvent, SpanKind as OtlpSpanKind},
33        status::StatusCode as OtlpStatusCode,
34    };
35}
36
37/// Trait for exporting a metric as OTLP protobuf `Metric` messages.
38///
39/// Each implementation appends one or more `Metric` to the output vec.
40/// Uses cumulative temporality — no state tracking needed.
41///
42/// `time_unix_nano` is a pre-computed timestamp (via [`now_nanos`]) shared
43/// across all data points in one export cycle for consistency.
44pub trait OtlpExport {
45    fn export_otlp(
46        &self,
47        metrics: &mut Vec<pb::Metric>,
48        name: &str,
49        description: &str,
50        time_unix_nano: u64,
51    );
52}
53
54// ============================================================================
55// Helpers
56// ============================================================================
57
58fn make_kv(key: &str, value: &str) -> pb::KeyValue {
59    pb::KeyValue {
60        key: key.to_string(),
61        value: Some(pb::AnyValue {
62            value: Some(pb::any_value::Value::StringValue(value.to_string())),
63        }),
64    }
65}
66
67fn pairs_to_attributes(pairs: &[(String, String)]) -> Vec<pb::KeyValue> {
68    pairs.iter().map(|(k, v)| make_kv(k, v)).collect()
69}
70
71fn label_to_attribute<L: LabelEnum>(label: L) -> pb::KeyValue {
72    make_kv(L::LABEL_NAME, label.variant_name())
73}
74
75/// Returns the current time as nanoseconds since the Unix epoch.
76///
77/// Use this to compute a shared timestamp for a batch of OTLP exports.
78pub fn now_nanos() -> u64 {
79    std::time::SystemTime::now()
80        .duration_since(std::time::UNIX_EPOCH)
81        .unwrap_or_default()
82        .as_nanos() as u64
83}
84
85fn int_data_point(
86    value: i64,
87    attributes: Vec<pb::KeyValue>,
88    time_unix_nano: u64,
89) -> pb::NumberDataPoint {
90    pb::NumberDataPoint {
91        attributes,
92        time_unix_nano,
93        value: Some(pb::number_data_point::Value::AsInt(value)),
94        ..Default::default()
95    }
96}
97
98fn double_data_point(
99    value: f64,
100    attributes: Vec<pb::KeyValue>,
101    time_unix_nano: u64,
102) -> pb::NumberDataPoint {
103    pb::NumberDataPoint {
104        attributes,
105        time_unix_nano,
106        value: Some(pb::number_data_point::Value::AsDouble(value)),
107        ..Default::default()
108    }
109}
110
111/// Convert cumulative bucket counts (as returned by `buckets_cumulative()`) to
112/// OTLP's per-bucket counts and explicit bounds.
113///
114/// OTLP expects non-cumulative bucket counts and omits the +Inf bound from
115/// `explicit_bounds` (it's implied by the final bucket).
116fn cumulative_to_otlp_buckets(cumulative: &[(u64, u64)]) -> (Vec<u64>, Vec<f64>) {
117    cumulative_to_otlp_buckets_iter(cumulative.iter().copied())
118}
119
120fn cumulative_to_otlp_buckets_iter(
121    cumulative: impl IntoIterator<Item = (u64, u64)>,
122) -> (Vec<u64>, Vec<f64>) {
123    let iter = cumulative.into_iter();
124    let (lower, _) = iter.size_hint();
125    let mut bucket_counts = Vec::with_capacity(lower);
126    let mut explicit_bounds = Vec::with_capacity(lower.saturating_sub(1));
127    let mut prev = 0u64;
128
129    for (bound, cum_count) in iter {
130        bucket_counts.push(cum_count.saturating_sub(prev));
131        prev = cum_count;
132        if bound != u64::MAX {
133            explicit_bounds.push(bound as f64);
134        }
135    }
136
137    (bucket_counts, explicit_bounds)
138}
139
140/// Build an OTLP `Resource` with a service name and optional extra attributes.
141pub fn build_resource(service_name: &str, attrs: &[(&str, &str)]) -> pb::Resource {
142    let mut attributes = vec![make_kv("service.name", service_name)];
143    for (k, v) in attrs {
144        attributes.push(make_kv(k, v));
145    }
146    pb::Resource {
147        attributes,
148        ..Default::default()
149    }
150}
151
152/// Wrap a vec of `Metric` into a full `ExportMetricsServiceRequest`.
153///
154/// Takes the resource by reference and clones it into the request.
155pub fn build_export_request(
156    resource: &pb::Resource,
157    scope_name: &str,
158    metrics: Vec<pb::Metric>,
159) -> pb::ExportMetricsServiceRequest {
160    pb::ExportMetricsServiceRequest {
161        resource_metrics: vec![pb::ResourceMetrics {
162            resource: Some(resource.clone()),
163            scope_metrics: vec![pb::ScopeMetrics {
164                scope: Some(pb::InstrumentationScope {
165                    name: scope_name.to_string(),
166                    ..Default::default()
167                }),
168                metrics,
169                ..Default::default()
170            }],
171            ..Default::default()
172        }],
173    }
174}
175
176// ============================================================================
177// OtlpExport implementations
178// ============================================================================
179
180impl OtlpExport for Counter {
181    fn export_otlp(
182        &self,
183        metrics: &mut Vec<pb::Metric>,
184        name: &str,
185        description: &str,
186        time_unix_nano: u64,
187    ) {
188        let value = self.sum() as i64;
189        metrics.push(pb::Metric {
190            name: name.to_string(),
191            description: description.to_string(),
192            data: Some(pb::metric::Data::Sum(pb::Sum {
193                // Counter uses AtomicIsize — callers can add negative values,
194                // so we cannot guarantee monotonicity.
195                data_points: vec![int_data_point(value, Vec::new(), time_unix_nano)],
196                aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
197                is_monotonic: false,
198            })),
199            ..Default::default()
200        });
201    }
202}
203
204impl OtlpExport for Gauge {
205    fn export_otlp(
206        &self,
207        metrics: &mut Vec<pb::Metric>,
208        name: &str,
209        description: &str,
210        time_unix_nano: u64,
211    ) {
212        metrics.push(pb::Metric {
213            name: name.to_string(),
214            description: description.to_string(),
215            data: Some(pb::metric::Data::Gauge(pb::OtlpGauge {
216                data_points: vec![int_data_point(self.get(), Vec::new(), time_unix_nano)],
217            })),
218            ..Default::default()
219        });
220    }
221}
222
223impl OtlpExport for GaugeF64 {
224    fn export_otlp(
225        &self,
226        metrics: &mut Vec<pb::Metric>,
227        name: &str,
228        description: &str,
229        time_unix_nano: u64,
230    ) {
231        metrics.push(pb::Metric {
232            name: name.to_string(),
233            description: description.to_string(),
234            data: Some(pb::metric::Data::Gauge(pb::OtlpGauge {
235                data_points: vec![double_data_point(self.get(), Vec::new(), time_unix_nano)],
236            })),
237            ..Default::default()
238        });
239    }
240}
241
242impl OtlpExport for Histogram {
243    fn export_otlp(
244        &self,
245        metrics: &mut Vec<pb::Metric>,
246        name: &str,
247        description: &str,
248        time_unix_nano: u64,
249    ) {
250        let cumulative = self.buckets_cumulative();
251        let count = self.count();
252        let sum = self.sum();
253        let (bucket_counts, explicit_bounds) = cumulative_to_otlp_buckets(&cumulative);
254
255        metrics.push(pb::Metric {
256            name: name.to_string(),
257            description: description.to_string(),
258            data: Some(pb::metric::Data::Histogram(pb::OtlpHistogram {
259                data_points: vec![pb::HistogramDataPoint {
260                    time_unix_nano,
261                    count,
262                    sum: Some(sum as f64),
263                    bucket_counts,
264                    explicit_bounds,
265                    ..Default::default()
266                }],
267                aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
268            })),
269            ..Default::default()
270        });
271    }
272}
273
274/// Build an OTLP ExponentialHistogramDataPoint from an ExpBucketsSnapshot.
275fn exp_histogram_data_point(
276    snap: &ExpBucketsSnapshot,
277    attributes: Vec<pb::KeyValue>,
278    time_unix_nano: u64,
279) -> pb::ExponentialHistogramDataPoint {
280    // Find the range of non-zero positive buckets to compact the array.
281    let mut first_nonzero: Option<usize> = None;
282    let mut last_nonzero: Option<usize> = None;
283    for (i, &c) in snap.positive.iter().enumerate() {
284        if c > 0 {
285            if first_nonzero.is_none() {
286                first_nonzero = Some(i);
287            }
288            last_nonzero = Some(i);
289        }
290    }
291
292    let positive = match (first_nonzero, last_nonzero) {
293        (Some(first), Some(last)) => {
294            let bucket_counts: Vec<u64> = snap.positive[first..=last].to_vec();
295            Some(pb::exponential_histogram_data_point::Buckets {
296                offset: first as i32,
297                bucket_counts,
298            })
299        }
300        _ => None,
301    };
302
303    pb::ExponentialHistogramDataPoint {
304        attributes,
305        time_unix_nano,
306        count: snap.count,
307        sum: Some(snap.sum as f64),
308        scale: 0, // base-2
309        zero_count: snap.zero_count,
310        positive,
311        negative: None, // u64 values are always non-negative
312        min: snap.min().map(|v| v as f64),
313        max: snap.max().map(|v| v as f64),
314        ..Default::default()
315    }
316}
317
318impl OtlpExport for Distribution {
319    /// Distribution exports as a native OTLP ExponentialHistogram (scale 0, base-2).
320    fn export_otlp(
321        &self,
322        metrics: &mut Vec<pb::Metric>,
323        name: &str,
324        description: &str,
325        time_unix_nano: u64,
326    ) {
327        let snap = self.buckets_snapshot();
328        let dp = exp_histogram_data_point(&snap, Vec::new(), time_unix_nano);
329
330        metrics.push(pb::Metric {
331            name: name.to_string(),
332            description: description.to_string(),
333            data: Some(pb::metric::Data::ExponentialHistogram(
334                pb::OtlpExpHistogram {
335                    data_points: vec![dp],
336                    aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
337                },
338            )),
339            ..Default::default()
340        });
341    }
342}
343
344// ============================================================================
345// Labeled metric implementations
346// ============================================================================
347
348impl<L: LabelEnum> OtlpExport for LabeledCounter<L> {
349    fn export_otlp(
350        &self,
351        metrics: &mut Vec<pb::Metric>,
352        name: &str,
353        description: &str,
354        time_unix_nano: u64,
355    ) {
356        let data_points: Vec<_> = self
357            .iter()
358            .map(|(label, count)| {
359                int_data_point(
360                    count as i64,
361                    vec![label_to_attribute(label)],
362                    time_unix_nano,
363                )
364            })
365            .collect();
366
367        metrics.push(pb::Metric {
368            name: name.to_string(),
369            description: description.to_string(),
370            data: Some(pb::metric::Data::Sum(pb::Sum {
371                data_points,
372                aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
373                is_monotonic: false,
374            })),
375            ..Default::default()
376        });
377    }
378}
379
380impl<L: LabelEnum> OtlpExport for LabeledGauge<L> {
381    fn export_otlp(
382        &self,
383        metrics: &mut Vec<pb::Metric>,
384        name: &str,
385        description: &str,
386        time_unix_nano: u64,
387    ) {
388        let data_points: Vec<_> = self
389            .iter()
390            .map(|(label, value)| {
391                int_data_point(value, vec![label_to_attribute(label)], time_unix_nano)
392            })
393            .collect();
394
395        metrics.push(pb::Metric {
396            name: name.to_string(),
397            description: description.to_string(),
398            data: Some(pb::metric::Data::Gauge(pb::OtlpGauge { data_points })),
399            ..Default::default()
400        });
401    }
402}
403
404impl<L: LabelEnum> OtlpExport for LabeledHistogram<L> {
405    fn export_otlp(
406        &self,
407        metrics: &mut Vec<pb::Metric>,
408        name: &str,
409        description: &str,
410        time_unix_nano: u64,
411    ) {
412        let mut data_points = Vec::new();
413
414        for (label, buckets, sum, count) in self.iter() {
415            let attrs = vec![label_to_attribute(label)];
416            let (bucket_counts, explicit_bounds) = cumulative_to_otlp_buckets(&buckets);
417
418            data_points.push(pb::HistogramDataPoint {
419                attributes: attrs,
420                time_unix_nano,
421                count,
422                sum: Some(sum as f64),
423                bucket_counts,
424                explicit_bounds,
425                ..Default::default()
426            });
427        }
428
429        metrics.push(pb::Metric {
430            name: name.to_string(),
431            description: description.to_string(),
432            data: Some(pb::metric::Data::Histogram(pb::OtlpHistogram {
433                data_points,
434                aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
435            })),
436            ..Default::default()
437        });
438    }
439}
440
441// ============================================================================
442// Dynamic metric implementations
443// ============================================================================
444
445impl OtlpExport for DynamicCounter {
446    fn export_otlp(
447        &self,
448        metrics: &mut Vec<pb::Metric>,
449        name: &str,
450        description: &str,
451        time_unix_nano: u64,
452    ) {
453        let mut data_points = Vec::new();
454        self.visit_series(|pairs, count| {
455            data_points.push(int_data_point(
456                count as i64,
457                pairs_to_attributes(pairs),
458                time_unix_nano,
459            ));
460        });
461
462        if data_points.is_empty() {
463            return;
464        }
465
466        metrics.push(pb::Metric {
467            name: name.to_string(),
468            description: description.to_string(),
469            data: Some(pb::metric::Data::Sum(pb::Sum {
470                data_points,
471                aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
472                is_monotonic: false,
473            })),
474            ..Default::default()
475        });
476    }
477}
478
479impl OtlpExport for DynamicGauge {
480    fn export_otlp(
481        &self,
482        metrics: &mut Vec<pb::Metric>,
483        name: &str,
484        description: &str,
485        time_unix_nano: u64,
486    ) {
487        let mut data_points = Vec::new();
488        self.visit_series(|pairs, value| {
489            data_points.push(double_data_point(
490                value,
491                pairs_to_attributes(pairs),
492                time_unix_nano,
493            ));
494        });
495
496        if data_points.is_empty() {
497            return;
498        }
499
500        metrics.push(pb::Metric {
501            name: name.to_string(),
502            description: description.to_string(),
503            data: Some(pb::metric::Data::Gauge(pb::OtlpGauge { data_points })),
504            ..Default::default()
505        });
506    }
507}
508
509impl OtlpExport for DynamicGaugeI64 {
510    fn export_otlp(
511        &self,
512        metrics: &mut Vec<pb::Metric>,
513        name: &str,
514        description: &str,
515        time_unix_nano: u64,
516    ) {
517        let mut data_points = Vec::new();
518        self.visit_series(|pairs, value| {
519            data_points.push(int_data_point(
520                value,
521                pairs_to_attributes(pairs),
522                time_unix_nano,
523            ));
524        });
525
526        if data_points.is_empty() {
527            return;
528        }
529
530        metrics.push(pb::Metric {
531            name: name.to_string(),
532            description: description.to_string(),
533            data: Some(pb::metric::Data::Gauge(pb::OtlpGauge { data_points })),
534            ..Default::default()
535        });
536    }
537}
538
539impl OtlpExport for DynamicHistogram {
540    fn export_otlp(
541        &self,
542        metrics: &mut Vec<pb::Metric>,
543        name: &str,
544        description: &str,
545        time_unix_nano: u64,
546    ) {
547        let mut data_points = Vec::new();
548
549        self.visit_series(|pairs, series| {
550            let (bucket_counts, explicit_bounds) =
551                cumulative_to_otlp_buckets_iter(series.buckets_cumulative_iter());
552
553            data_points.push(pb::HistogramDataPoint {
554                attributes: pairs_to_attributes(pairs),
555                time_unix_nano,
556                count: series.count(),
557                sum: Some(series.sum() as f64),
558                bucket_counts,
559                explicit_bounds,
560                ..Default::default()
561            });
562        });
563
564        if data_points.is_empty() {
565            return;
566        }
567
568        metrics.push(pb::Metric {
569            name: name.to_string(),
570            description: description.to_string(),
571            data: Some(pb::metric::Data::Histogram(pb::OtlpHistogram {
572                data_points,
573                aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
574            })),
575            ..Default::default()
576        });
577    }
578}
579
580impl OtlpExport for DynamicDistribution {
581    /// Exports as native OTLP ExponentialHistogram (scale 0, base-2) per label set.
582    fn export_otlp(
583        &self,
584        metrics: &mut Vec<pb::Metric>,
585        name: &str,
586        description: &str,
587        time_unix_nano: u64,
588    ) {
589        let mut data_points = Vec::new();
590
591        self.visit_series(|pairs, _count, _sum, snap| {
592            let attrs = pairs_to_attributes(pairs);
593            data_points.push(exp_histogram_data_point(&snap, attrs, time_unix_nano));
594        });
595
596        if data_points.is_empty() {
597            return;
598        }
599
600        metrics.push(pb::Metric {
601            name: name.to_string(),
602            description: description.to_string(),
603            data: Some(pb::metric::Data::ExponentialHistogram(
604                pb::OtlpExpHistogram {
605                    data_points,
606                    aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
607                },
608            )),
609            ..Default::default()
610        });
611    }
612}
613
614// ============================================================================
615// Trace export
616// ============================================================================
617
618use crate::span::{CompletedSpan, SpanKind, SpanStatus, SpanValue};
619
620impl CompletedSpan {
621    /// Convert this completed span into an OTLP protobuf `Span`.
622    pub fn to_otlp(&self) -> pb::OtlpSpan {
623        let kind = match self.kind {
624            SpanKind::Internal => pb::OtlpSpanKind::Internal,
625            SpanKind::Server => pb::OtlpSpanKind::Server,
626            SpanKind::Client => pb::OtlpSpanKind::Client,
627            SpanKind::Producer => pb::OtlpSpanKind::Producer,
628            SpanKind::Consumer => pb::OtlpSpanKind::Consumer,
629        };
630
631        let status = match &self.status {
632            SpanStatus::Unset => None,
633            SpanStatus::Ok => Some(pb::OtlpStatus {
634                code: pb::OtlpStatusCode::Ok as i32,
635                message: String::new(),
636            }),
637            SpanStatus::Error { message } => Some(pb::OtlpStatus {
638                code: pb::OtlpStatusCode::Error as i32,
639                message: message.to_string(),
640            }),
641        };
642
643        let attributes: Vec<pb::KeyValue> = self
644            .attributes
645            .iter()
646            .map(|attr| {
647                let value = match &attr.value {
648                    SpanValue::String(s) => pb::any_value::Value::StringValue(s.to_string()),
649                    SpanValue::I64(v) => pb::any_value::Value::IntValue(*v),
650                    SpanValue::F64(v) => pb::any_value::Value::DoubleValue(*v),
651                    SpanValue::Bool(v) => pb::any_value::Value::BoolValue(*v),
652                    SpanValue::Uuid(u) => pb::any_value::Value::StringValue(u.to_string()),
653                };
654                pb::KeyValue {
655                    key: attr.key.to_string(),
656                    value: Some(pb::AnyValue { value: Some(value) }),
657                }
658            })
659            .collect();
660
661        let events: Vec<pb::OtlpEvent> = self
662            .events
663            .iter()
664            .map(|evt| {
665                let attrs: Vec<pb::KeyValue> = evt
666                    .attributes
667                    .iter()
668                    .map(|a| {
669                        let v = match &a.value {
670                            SpanValue::String(s) => {
671                                pb::any_value::Value::StringValue(s.to_string())
672                            }
673                            SpanValue::I64(v) => pb::any_value::Value::IntValue(*v),
674                            SpanValue::F64(v) => pb::any_value::Value::DoubleValue(*v),
675                            SpanValue::Bool(v) => pb::any_value::Value::BoolValue(*v),
676                            SpanValue::Uuid(u) => pb::any_value::Value::StringValue(u.to_string()),
677                        };
678                        pb::KeyValue {
679                            key: a.key.to_string(),
680                            value: Some(pb::AnyValue { value: Some(v) }),
681                        }
682                    })
683                    .collect();
684                pb::OtlpEvent {
685                    time_unix_nano: evt.time_ns,
686                    name: evt.name.to_string(),
687                    attributes: attrs,
688                    dropped_attributes_count: 0,
689                }
690            })
691            .collect();
692
693        pb::OtlpSpan {
694            trace_id: self.trace_id.as_bytes().to_vec(),
695            span_id: self.span_id.as_bytes().to_vec(),
696            parent_span_id: if self.parent_span_id.is_invalid() {
697                Vec::new()
698            } else {
699                self.parent_span_id.as_bytes().to_vec()
700            },
701            name: self.name.to_string(),
702            kind: kind as i32,
703            start_time_unix_nano: self.start_time_ns,
704            end_time_unix_nano: self.end_time_ns,
705            attributes,
706            events,
707            status,
708            ..Default::default()
709        }
710    }
711}
712
713/// Wrap a vec of OTLP `Span` protos into a full `ExportTraceServiceRequest`.
714///
715/// Takes the resource by reference and clones it into the request.
716pub fn build_trace_export_request(
717    resource: &pb::Resource,
718    scope_name: &str,
719    spans: Vec<pb::OtlpSpan>,
720) -> pb::ExportTraceServiceRequest {
721    pb::ExportTraceServiceRequest {
722        resource_spans: vec![pb::ResourceSpans {
723            resource: Some(resource.clone()),
724            scope_spans: vec![pb::ScopeSpans {
725                scope: Some(pb::InstrumentationScope {
726                    name: scope_name.to_string(),
727                    ..Default::default()
728                }),
729                spans,
730                ..Default::default()
731            }],
732            ..Default::default()
733        }],
734    }
735}
736
737#[cfg(test)]
738mod tests {
739    use super::*;
740
741    fn test_timestamp() -> u64 {
742        1_000_000_000 // fixed timestamp for deterministic tests
743    }
744
745    #[test]
746    fn test_counter_otlp() {
747        let counter = Counter::new(4);
748        counter.add(42);
749
750        let mut metrics = Vec::new();
751        counter.export_otlp(
752            &mut metrics,
753            "test_counter",
754            "A test counter",
755            test_timestamp(),
756        );
757
758        assert_eq!(metrics.len(), 1);
759        assert_eq!(metrics[0].name, "test_counter");
760        assert_eq!(metrics[0].description, "A test counter");
761
762        let data = metrics[0].data.as_ref().expect("missing data");
763        match data {
764            pb::metric::Data::Sum(sum) => {
765                // Counter uses isize (can go negative), so is_monotonic must be false
766                assert!(!sum.is_monotonic);
767                assert_eq!(
768                    sum.aggregation_temporality,
769                    pb::AggregationTemporality::Cumulative as i32
770                );
771                assert_eq!(sum.data_points.len(), 1);
772                assert_eq!(
773                    sum.data_points[0].value,
774                    Some(pb::number_data_point::Value::AsInt(42))
775                );
776                assert_eq!(sum.data_points[0].time_unix_nano, test_timestamp());
777            }
778            _ => panic!("expected Sum, got {:?}", data),
779        }
780    }
781
782    #[test]
783    fn test_gauge_otlp() {
784        let gauge = Gauge::new();
785        gauge.set(-10);
786
787        let mut metrics = Vec::new();
788        gauge.export_otlp(&mut metrics, "test_gauge", "A test gauge", test_timestamp());
789
790        assert_eq!(metrics.len(), 1);
791        match metrics[0].data.as_ref().expect("missing data") {
792            pb::metric::Data::Gauge(g) => {
793                assert_eq!(g.data_points.len(), 1);
794                assert_eq!(
795                    g.data_points[0].value,
796                    Some(pb::number_data_point::Value::AsInt(-10))
797                );
798            }
799            other => panic!("expected Gauge, got {:?}", other),
800        }
801    }
802
803    #[test]
804    fn test_gauge_f64_otlp() {
805        let gauge = GaugeF64::new();
806        gauge.set(3.125);
807
808        let mut metrics = Vec::new();
809        gauge.export_otlp(&mut metrics, "test_gauge_f64", "", test_timestamp());
810
811        match metrics[0].data.as_ref().expect("missing data") {
812            pb::metric::Data::Gauge(g) => {
813                assert_eq!(g.data_points.len(), 1);
814                match g.data_points[0].value {
815                    Some(pb::number_data_point::Value::AsDouble(v)) => {
816                        assert!((v - 3.125).abs() < 1e-10);
817                    }
818                    ref other => panic!("expected AsDouble, got {:?}", other),
819                }
820            }
821            other => panic!("expected Gauge, got {:?}", other),
822        }
823    }
824
825    #[test]
826    fn test_histogram_otlp() {
827        let h = Histogram::new(&[10, 100], 4);
828        h.record(5);
829        h.record(50);
830        h.record(500);
831
832        let mut metrics = Vec::new();
833        h.export_otlp(
834            &mut metrics,
835            "test_hist",
836            "A test histogram",
837            test_timestamp(),
838        );
839
840        assert_eq!(metrics.len(), 1);
841        match metrics[0].data.as_ref().expect("missing data") {
842            pb::metric::Data::Histogram(hist) => {
843                assert_eq!(
844                    hist.aggregation_temporality,
845                    pb::AggregationTemporality::Cumulative as i32
846                );
847                assert_eq!(hist.data_points.len(), 1);
848
849                let dp = &hist.data_points[0];
850                assert_eq!(dp.count, 3);
851                assert_eq!(dp.sum, Some(555.0));
852                assert_eq!(dp.explicit_bounds, vec![10.0, 100.0]);
853                assert_eq!(dp.bucket_counts, vec![1, 1, 1]);
854                assert_eq!(dp.time_unix_nano, test_timestamp());
855            }
856            other => panic!("expected Histogram, got {:?}", other),
857        }
858    }
859
860    #[test]
861    fn test_distribution_otlp() {
862        let dist = Distribution::new(4);
863        dist.record(100);
864        dist.record(200);
865        dist.record(300);
866
867        let mut metrics = Vec::new();
868        dist.export_otlp(
869            &mut metrics,
870            "test_dist",
871            "A distribution",
872            test_timestamp(),
873        );
874
875        assert_eq!(metrics.len(), 1);
876        assert_eq!(metrics[0].name, "test_dist");
877
878        match metrics[0].data.as_ref().expect("missing data") {
879            pb::metric::Data::ExponentialHistogram(hist) => {
880                assert_eq!(
881                    hist.aggregation_temporality,
882                    pb::AggregationTemporality::Cumulative as i32
883                );
884                assert_eq!(hist.data_points.len(), 1);
885
886                let dp = &hist.data_points[0];
887                assert_eq!(dp.count, 3);
888                assert_eq!(dp.sum, Some(600.0));
889                assert_eq!(dp.scale, 0);
890                assert_eq!(dp.zero_count, 0);
891                assert_eq!(dp.time_unix_nano, test_timestamp());
892                // positive buckets should be set
893                assert!(dp.positive.is_some());
894                let positive = dp.positive.as_ref().expect("positive buckets");
895                // 100 -> bucket 6, 200 -> bucket 7, 300 -> bucket 8
896                assert!(!positive.bucket_counts.is_empty());
897            }
898            other => panic!("expected ExponentialHistogram, got {:?}", other),
899        }
900    }
901
902    #[test]
903    fn test_dynamic_counter_otlp() {
904        let counter = DynamicCounter::new(4);
905        counter.add(&[("env", "prod"), ("region", "us")], 10);
906        counter.add(&[("env", "staging"), ("region", "eu")], 5);
907
908        let mut metrics = Vec::new();
909        counter.export_otlp(&mut metrics, "requests", "Request count", test_timestamp());
910
911        assert_eq!(metrics.len(), 1);
912        match metrics[0].data.as_ref().expect("missing data") {
913            pb::metric::Data::Sum(sum) => {
914                assert!(!sum.is_monotonic);
915                assert_eq!(sum.data_points.len(), 2);
916                for dp in &sum.data_points {
917                    assert_eq!(dp.attributes.len(), 2);
918                }
919            }
920            other => panic!("expected Sum, got {:?}", other),
921        }
922    }
923
924    #[test]
925    fn test_build_export_request() {
926        let resource = build_resource("test-service", &[("version", "1.0")]);
927        let counter = Counter::new(4);
928        counter.add(1);
929
930        let mut metrics = Vec::new();
931        counter.export_otlp(&mut metrics, "my_counter", "", test_timestamp());
932
933        let request = build_export_request(&resource, "fast-telemetry", metrics);
934
935        assert_eq!(request.resource_metrics.len(), 1);
936        let rm = &request.resource_metrics[0];
937        let res = rm.resource.as_ref().expect("missing resource");
938        assert_eq!(res.attributes.len(), 2); // service.name + version
939        assert_eq!(res.attributes[0].key, "service.name");
940
941        assert_eq!(rm.scope_metrics.len(), 1);
942        let sm = &rm.scope_metrics[0];
943        let scope = sm.scope.as_ref().expect("missing scope");
944        assert_eq!(scope.name, "fast-telemetry");
945        assert_eq!(sm.metrics.len(), 1);
946    }
947
948    #[test]
949    fn test_make_kv() {
950        let kv = make_kv("foo", "bar");
951        assert_eq!(kv.key, "foo");
952        match kv
953            .value
954            .expect("missing value")
955            .value
956            .expect("missing inner")
957        {
958            pb::any_value::Value::StringValue(s) => assert_eq!(s, "bar"),
959            other => panic!("expected StringValue, got {:?}", other),
960        }
961    }
962
963    // -- Labeled metric tests --
964
965    #[derive(Copy, Clone, Debug)]
966    enum TestLabel {
967        A,
968        B,
969        C,
970    }
971
972    impl crate::LabelEnum for TestLabel {
973        const CARDINALITY: usize = 3;
974        const LABEL_NAME: &'static str = "test";
975
976        fn as_index(self) -> usize {
977            self as usize
978        }
979        fn from_index(index: usize) -> Self {
980            match index {
981                0 => Self::A,
982                1 => Self::B,
983                _ => Self::C,
984            }
985        }
986        fn variant_name(self) -> &'static str {
987            match self {
988                Self::A => "a",
989                Self::B => "b",
990                Self::C => "c",
991            }
992        }
993    }
994
995    #[test]
996    fn test_labeled_counter_otlp() {
997        let counter = LabeledCounter::<TestLabel>::new(4);
998        counter.add(TestLabel::A, 10);
999        counter.add(TestLabel::B, 20);
1000
1001        let mut metrics = Vec::new();
1002        counter.export_otlp(
1003            &mut metrics,
1004            "labeled_counter",
1005            "By label",
1006            test_timestamp(),
1007        );
1008
1009        assert_eq!(metrics.len(), 1);
1010        match metrics[0].data.as_ref().expect("missing data") {
1011            pb::metric::Data::Sum(sum) => {
1012                assert!(!sum.is_monotonic);
1013                assert_eq!(sum.data_points.len(), 3); // A, B, C (all variants exported)
1014                // Find the data point for label A
1015                let dp_a = sum.data_points.iter().find(|dp| {
1016                    dp.attributes.iter().any(|kv| kv.key == "test" && matches!(&kv.value, Some(v) if matches!(&v.value, Some(pb::any_value::Value::StringValue(s)) if s == "a")))
1017                }).expect("missing data point for label A");
1018                assert_eq!(dp_a.value, Some(pb::number_data_point::Value::AsInt(10)));
1019            }
1020            other => panic!("expected Sum, got {:?}", other),
1021        }
1022    }
1023
1024    #[test]
1025    fn test_labeled_gauge_otlp() {
1026        let gauge = LabeledGauge::<TestLabel>::new();
1027        gauge.set(TestLabel::A, 42);
1028        gauge.set(TestLabel::C, -5);
1029
1030        let mut metrics = Vec::new();
1031        gauge.export_otlp(&mut metrics, "labeled_gauge", "By label", test_timestamp());
1032
1033        assert_eq!(metrics.len(), 1);
1034        match metrics[0].data.as_ref().expect("missing data") {
1035            pb::metric::Data::Gauge(g) => {
1036                assert_eq!(g.data_points.len(), 3);
1037            }
1038            other => panic!("expected Gauge, got {:?}", other),
1039        }
1040    }
1041
1042    #[test]
1043    fn test_labeled_histogram_otlp() {
1044        let h = LabeledHistogram::<TestLabel>::new(&[10, 100], 4);
1045        h.record(TestLabel::A, 5);
1046        h.record(TestLabel::A, 50);
1047        h.record(TestLabel::B, 500);
1048
1049        let mut metrics = Vec::new();
1050        h.export_otlp(&mut metrics, "labeled_hist", "By label", test_timestamp());
1051
1052        assert_eq!(metrics.len(), 1);
1053        match metrics[0].data.as_ref().expect("missing data") {
1054            pb::metric::Data::Histogram(hist) => {
1055                assert_eq!(
1056                    hist.aggregation_temporality,
1057                    pb::AggregationTemporality::Cumulative as i32
1058                );
1059                assert_eq!(hist.data_points.len(), 3); // all variants
1060                // Each data point should have a label attribute
1061                for dp in &hist.data_points {
1062                    assert_eq!(dp.attributes.len(), 1);
1063                    assert_eq!(dp.attributes[0].key, "test");
1064                    assert_eq!(dp.time_unix_nano, test_timestamp());
1065                }
1066            }
1067            other => panic!("expected Histogram, got {:?}", other),
1068        }
1069    }
1070
1071    // -- Dynamic metric tests --
1072
1073    #[test]
1074    fn test_dynamic_gauge_otlp() {
1075        let gauge = DynamicGauge::new(4);
1076        gauge.set(&[("host", "node1")], 3.125);
1077        gauge.set(&[("host", "node2")], 2.72);
1078
1079        let mut metrics = Vec::new();
1080        gauge.export_otlp(
1081            &mut metrics,
1082            "cpu_usage",
1083            "CPU percentage",
1084            test_timestamp(),
1085        );
1086
1087        assert_eq!(metrics.len(), 1);
1088        match metrics[0].data.as_ref().expect("missing data") {
1089            pb::metric::Data::Gauge(g) => {
1090                assert_eq!(g.data_points.len(), 2);
1091                for dp in &g.data_points {
1092                    assert_eq!(dp.attributes.len(), 1);
1093                    assert!(matches!(
1094                        dp.value,
1095                        Some(pb::number_data_point::Value::AsDouble(_))
1096                    ));
1097                }
1098            }
1099            other => panic!("expected Gauge, got {:?}", other),
1100        }
1101    }
1102
1103    #[test]
1104    fn test_dynamic_gauge_i64_otlp() {
1105        let gauge = DynamicGaugeI64::new(4);
1106        gauge.set(&[("region", "us")], 100);
1107        gauge.set(&[("region", "eu")], 200);
1108
1109        let mut metrics = Vec::new();
1110        gauge.export_otlp(
1111            &mut metrics,
1112            "connections",
1113            "Active connections",
1114            test_timestamp(),
1115        );
1116
1117        assert_eq!(metrics.len(), 1);
1118        match metrics[0].data.as_ref().expect("missing data") {
1119            pb::metric::Data::Gauge(g) => {
1120                assert_eq!(g.data_points.len(), 2);
1121                for dp in &g.data_points {
1122                    assert_eq!(dp.attributes.len(), 1);
1123                    assert!(matches!(
1124                        dp.value,
1125                        Some(pb::number_data_point::Value::AsInt(_))
1126                    ));
1127                }
1128            }
1129            other => panic!("expected Gauge, got {:?}", other),
1130        }
1131    }
1132
1133    #[test]
1134    fn test_dynamic_histogram_otlp() {
1135        let h = DynamicHistogram::new(&[10, 100, 1000], 4);
1136        h.record(&[("endpoint", "/api")], 5);
1137        h.record(&[("endpoint", "/api")], 50);
1138        h.record(&[("endpoint", "/health")], 500);
1139
1140        let mut metrics = Vec::new();
1141        h.export_otlp(&mut metrics, "latency", "Request latency", test_timestamp());
1142
1143        assert_eq!(metrics.len(), 1);
1144        match metrics[0].data.as_ref().expect("missing data") {
1145            pb::metric::Data::Histogram(hist) => {
1146                assert_eq!(
1147                    hist.aggregation_temporality,
1148                    pb::AggregationTemporality::Cumulative as i32
1149                );
1150                assert_eq!(hist.data_points.len(), 2); // /api and /health
1151                for dp in &hist.data_points {
1152                    assert_eq!(dp.attributes.len(), 1);
1153                    assert_eq!(dp.attributes[0].key, "endpoint");
1154                    assert_eq!(dp.time_unix_nano, test_timestamp());
1155                    // explicit_bounds should not include +Inf
1156                    assert_eq!(dp.explicit_bounds, vec![10.0, 100.0, 1000.0]);
1157                }
1158            }
1159            other => panic!("expected Histogram, got {:?}", other),
1160        }
1161    }
1162
1163    #[test]
1164    fn test_dynamic_distribution_otlp() {
1165        let dist = DynamicDistribution::new(4);
1166        dist.record(&[("method", "GET")], 100);
1167        dist.record(&[("method", "GET")], 200);
1168        dist.record(&[("method", "POST")], 300);
1169
1170        let mut metrics = Vec::new();
1171        dist.export_otlp(
1172            &mut metrics,
1173            "response_size",
1174            "Size in bytes",
1175            test_timestamp(),
1176        );
1177
1178        assert_eq!(metrics.len(), 1);
1179        assert_eq!(metrics[0].name, "response_size");
1180
1181        match metrics[0].data.as_ref().expect("missing data") {
1182            pb::metric::Data::ExponentialHistogram(hist) => {
1183                assert_eq!(
1184                    hist.aggregation_temporality,
1185                    pb::AggregationTemporality::Cumulative as i32
1186                );
1187                assert_eq!(hist.data_points.len(), 2); // GET and POST
1188                for dp in &hist.data_points {
1189                    assert_eq!(dp.attributes.len(), 1);
1190                    assert_eq!(dp.attributes[0].key, "method");
1191                    assert_eq!(dp.scale, 0);
1192                    assert!(dp.positive.is_some());
1193                }
1194            }
1195            other => panic!("expected ExponentialHistogram, got {:?}", other),
1196        }
1197    }
1198
1199    #[test]
1200    fn test_empty_dynamic_metrics_produce_nothing() {
1201        let counter = DynamicCounter::new(4);
1202        let gauge = DynamicGauge::new(4);
1203        let gauge_i64 = DynamicGaugeI64::new(4);
1204        let hist = DynamicHistogram::new(&[10], 4);
1205        let dist = DynamicDistribution::new(4);
1206
1207        let mut metrics = Vec::new();
1208        let ts = test_timestamp();
1209        counter.export_otlp(&mut metrics, "c", "", ts);
1210        gauge.export_otlp(&mut metrics, "g", "", ts);
1211        gauge_i64.export_otlp(&mut metrics, "gi", "", ts);
1212        hist.export_otlp(&mut metrics, "h", "", ts);
1213        dist.export_otlp(&mut metrics, "d", "", ts);
1214
1215        assert!(
1216            metrics.is_empty(),
1217            "empty dynamic metrics should produce no output"
1218        );
1219    }
1220
1221    #[test]
1222    fn test_cumulative_to_otlp_buckets_helper() {
1223        // Input: cumulative [(10, 1), (100, 3), (u64::MAX, 5)]
1224        // Expected per-bucket: [1, 2, 2], bounds: [10.0, 100.0]
1225        let cumulative = vec![(10, 1), (100, 3), (u64::MAX, 5)];
1226        let (counts, bounds) = cumulative_to_otlp_buckets(&cumulative);
1227        assert_eq!(counts, vec![1, 2, 2]);
1228        assert_eq!(bounds, vec![10.0, 100.0]);
1229    }
1230
1231    // -- Trace export tests --
1232
1233    #[test]
1234    fn test_completed_span_to_otlp() {
1235        use crate::span::{SpanAttribute, SpanEvent, SpanKind, SpanStatus};
1236        use crate::span::{SpanId, TraceId};
1237
1238        let completed = CompletedSpan {
1239            trace_id: TraceId::from_hex("4bf92f3577b34da6a3ce929d0e0e4736").unwrap(),
1240            span_id: SpanId::from_hex("00f067aa0ba902b7").unwrap(),
1241            parent_span_id: SpanId::from_hex("1234567890abcdef").unwrap(),
1242            name: "test_operation".into(),
1243            kind: SpanKind::Server,
1244            start_time_ns: 1_000_000_000,
1245            end_time_ns: 2_000_000_000,
1246            status: SpanStatus::Ok,
1247            attributes: vec![
1248                SpanAttribute::new("http.method", "GET"),
1249                SpanAttribute::new("http.status_code", 200i64),
1250            ],
1251            events: vec![SpanEvent {
1252                name: "processing".into(),
1253                time_ns: 1_500_000_000,
1254                attributes: vec![SpanAttribute::new("step", "validate")],
1255            }],
1256        };
1257
1258        let otlp = completed.to_otlp();
1259
1260        assert_eq!(
1261            otlp.trace_id,
1262            &[
1263                0x4b, 0xf9, 0x2f, 0x35, 0x77, 0xb3, 0x4d, 0xa6, 0xa3, 0xce, 0x92, 0x9d, 0x0e, 0x0e,
1264                0x47, 0x36
1265            ]
1266        );
1267        assert_eq!(
1268            otlp.span_id,
1269            &[0x00, 0xf0, 0x67, 0xaa, 0x0b, 0xa9, 0x02, 0xb7]
1270        );
1271        assert_eq!(
1272            otlp.parent_span_id,
1273            &[0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef]
1274        );
1275        assert_eq!(otlp.name, "test_operation");
1276        assert_eq!(otlp.kind, pb::OtlpSpanKind::Server as i32);
1277        assert_eq!(otlp.start_time_unix_nano, 1_000_000_000);
1278        assert_eq!(otlp.end_time_unix_nano, 2_000_000_000);
1279
1280        // Status
1281        let status = otlp.status.unwrap();
1282        assert_eq!(status.code, pb::OtlpStatusCode::Ok as i32);
1283
1284        // Attributes
1285        assert_eq!(otlp.attributes.len(), 2);
1286        assert_eq!(otlp.attributes[0].key, "http.method");
1287        assert_eq!(otlp.attributes[1].key, "http.status_code");
1288
1289        // Events
1290        assert_eq!(otlp.events.len(), 1);
1291        assert_eq!(otlp.events[0].name, "processing");
1292        assert_eq!(otlp.events[0].time_unix_nano, 1_500_000_000);
1293        assert_eq!(otlp.events[0].attributes.len(), 1);
1294    }
1295
1296    #[test]
1297    fn test_completed_span_root_has_empty_parent() {
1298        use crate::span::{SpanId, TraceId};
1299
1300        let completed = CompletedSpan {
1301            trace_id: TraceId::random(),
1302            span_id: SpanId::random(),
1303            parent_span_id: SpanId::INVALID,
1304            name: "root".into(),
1305            kind: SpanKind::Server,
1306            start_time_ns: 1_000_000_000,
1307            end_time_ns: 2_000_000_000,
1308            status: SpanStatus::Unset,
1309            attributes: Vec::new(),
1310            events: Vec::new(),
1311        };
1312
1313        let otlp = completed.to_otlp();
1314        assert!(
1315            otlp.parent_span_id.is_empty(),
1316            "root span should have empty parent_span_id"
1317        );
1318        assert!(otlp.status.is_none(), "Unset status should map to None");
1319    }
1320
1321    #[test]
1322    fn test_completed_span_error_status() {
1323        use crate::span::{SpanId, TraceId};
1324
1325        let completed = CompletedSpan {
1326            trace_id: TraceId::random(),
1327            span_id: SpanId::random(),
1328            parent_span_id: SpanId::INVALID,
1329            name: "failing_op".into(),
1330            kind: SpanKind::Internal,
1331            start_time_ns: 1_000_000_000,
1332            end_time_ns: 2_000_000_000,
1333            status: SpanStatus::Error {
1334                message: "connection refused".into(),
1335            },
1336            attributes: Vec::new(),
1337            events: Vec::new(),
1338        };
1339
1340        let otlp = completed.to_otlp();
1341        let status = otlp.status.unwrap();
1342        assert_eq!(status.code, pb::OtlpStatusCode::Error as i32);
1343        assert_eq!(status.message, "connection refused");
1344    }
1345
1346    #[test]
1347    fn test_build_trace_export_request() {
1348        use crate::span::{SpanId, TraceId};
1349
1350        let resource = build_resource("test-service", &[("version", "1.0")]);
1351        let completed = CompletedSpan {
1352            trace_id: TraceId::random(),
1353            span_id: SpanId::random(),
1354            parent_span_id: SpanId::INVALID,
1355            name: "test".into(),
1356            kind: SpanKind::Server,
1357            start_time_ns: 1_000_000_000,
1358            end_time_ns: 2_000_000_000,
1359            status: SpanStatus::Ok,
1360            attributes: Vec::new(),
1361            events: Vec::new(),
1362        };
1363
1364        let otlp_span = completed.to_otlp();
1365        let request = build_trace_export_request(&resource, "fast-telemetry", vec![otlp_span]);
1366
1367        assert_eq!(request.resource_spans.len(), 1);
1368        let rs = &request.resource_spans[0];
1369        let res = rs.resource.as_ref().unwrap();
1370        assert_eq!(res.attributes.len(), 2); // service.name + version
1371
1372        assert_eq!(rs.scope_spans.len(), 1);
1373        let ss = &rs.scope_spans[0];
1374        let scope = ss.scope.as_ref().unwrap();
1375        assert_eq!(scope.name, "fast-telemetry");
1376        assert_eq!(ss.spans.len(), 1);
1377    }
1378}