1use crate::exp_buckets::ExpBucketsSnapshot;
10use crate::{
11 Counter, Distribution, DynamicCounter, DynamicDistribution, DynamicGauge, DynamicGaugeI64,
12 DynamicHistogram, Gauge, GaugeF64, Histogram, LabelEnum, LabeledCounter, LabeledGauge,
13 LabeledHistogram,
14};
15
16pub mod pb {
18 pub use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
19 pub use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
20 pub use opentelemetry_proto::tonic::common::v1::{
21 AnyValue, InstrumentationScope, KeyValue, any_value,
22 };
23 pub use opentelemetry_proto::tonic::metrics::v1::{
24 self, AggregationTemporality, ExponentialHistogram as OtlpExpHistogram,
25 ExponentialHistogramDataPoint, Gauge as OtlpGauge, Histogram as OtlpHistogram,
26 HistogramDataPoint, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum,
27 exponential_histogram_data_point, metric, number_data_point,
28 };
29 pub use opentelemetry_proto::tonic::resource::v1::Resource;
30 pub use opentelemetry_proto::tonic::trace::v1::{
31 ResourceSpans, ScopeSpans, Span as OtlpSpan, Status as OtlpStatus,
32 span::{Event as OtlpEvent, SpanKind as OtlpSpanKind},
33 status::StatusCode as OtlpStatusCode,
34 };
35}
36
37pub trait OtlpExport {
45 fn export_otlp(
46 &self,
47 metrics: &mut Vec<pb::Metric>,
48 name: &str,
49 description: &str,
50 time_unix_nano: u64,
51 );
52}
53
54fn make_kv(key: &str, value: &str) -> pb::KeyValue {
59 pb::KeyValue {
60 key: key.to_string(),
61 value: Some(pb::AnyValue {
62 value: Some(pb::any_value::Value::StringValue(value.to_string())),
63 }),
64 }
65}
66
67fn pairs_to_attributes(pairs: &[(String, String)]) -> Vec<pb::KeyValue> {
68 pairs.iter().map(|(k, v)| make_kv(k, v)).collect()
69}
70
71fn label_to_attribute<L: LabelEnum>(label: L) -> pb::KeyValue {
72 make_kv(L::LABEL_NAME, label.variant_name())
73}
74
75pub fn now_nanos() -> u64 {
79 std::time::SystemTime::now()
80 .duration_since(std::time::UNIX_EPOCH)
81 .unwrap_or_default()
82 .as_nanos() as u64
83}
84
85fn int_data_point(
86 value: i64,
87 attributes: Vec<pb::KeyValue>,
88 time_unix_nano: u64,
89) -> pb::NumberDataPoint {
90 pb::NumberDataPoint {
91 attributes,
92 time_unix_nano,
93 value: Some(pb::number_data_point::Value::AsInt(value)),
94 ..Default::default()
95 }
96}
97
98fn double_data_point(
99 value: f64,
100 attributes: Vec<pb::KeyValue>,
101 time_unix_nano: u64,
102) -> pb::NumberDataPoint {
103 pb::NumberDataPoint {
104 attributes,
105 time_unix_nano,
106 value: Some(pb::number_data_point::Value::AsDouble(value)),
107 ..Default::default()
108 }
109}
110
111fn cumulative_to_otlp_buckets(cumulative: &[(u64, u64)]) -> (Vec<u64>, Vec<f64>) {
117 cumulative_to_otlp_buckets_iter(cumulative.iter().copied())
118}
119
120fn cumulative_to_otlp_buckets_iter(
121 cumulative: impl IntoIterator<Item = (u64, u64)>,
122) -> (Vec<u64>, Vec<f64>) {
123 let iter = cumulative.into_iter();
124 let (lower, _) = iter.size_hint();
125 let mut bucket_counts = Vec::with_capacity(lower);
126 let mut explicit_bounds = Vec::with_capacity(lower.saturating_sub(1));
127 let mut prev = 0u64;
128
129 for (bound, cum_count) in iter {
130 bucket_counts.push(cum_count.saturating_sub(prev));
131 prev = cum_count;
132 if bound != u64::MAX {
133 explicit_bounds.push(bound as f64);
134 }
135 }
136
137 (bucket_counts, explicit_bounds)
138}
139
140pub fn build_resource(service_name: &str, attrs: &[(&str, &str)]) -> pb::Resource {
142 let mut attributes = vec![make_kv("service.name", service_name)];
143 for (k, v) in attrs {
144 attributes.push(make_kv(k, v));
145 }
146 pb::Resource {
147 attributes,
148 ..Default::default()
149 }
150}
151
152pub fn build_export_request(
156 resource: &pb::Resource,
157 scope_name: &str,
158 metrics: Vec<pb::Metric>,
159) -> pb::ExportMetricsServiceRequest {
160 pb::ExportMetricsServiceRequest {
161 resource_metrics: vec![pb::ResourceMetrics {
162 resource: Some(resource.clone()),
163 scope_metrics: vec![pb::ScopeMetrics {
164 scope: Some(pb::InstrumentationScope {
165 name: scope_name.to_string(),
166 ..Default::default()
167 }),
168 metrics,
169 ..Default::default()
170 }],
171 ..Default::default()
172 }],
173 }
174}
175
176impl OtlpExport for Counter {
181 fn export_otlp(
182 &self,
183 metrics: &mut Vec<pb::Metric>,
184 name: &str,
185 description: &str,
186 time_unix_nano: u64,
187 ) {
188 let value = self.sum() as i64;
189 metrics.push(pb::Metric {
190 name: name.to_string(),
191 description: description.to_string(),
192 data: Some(pb::metric::Data::Sum(pb::Sum {
193 data_points: vec![int_data_point(value, Vec::new(), time_unix_nano)],
196 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
197 is_monotonic: false,
198 })),
199 ..Default::default()
200 });
201 }
202}
203
204impl OtlpExport for Gauge {
205 fn export_otlp(
206 &self,
207 metrics: &mut Vec<pb::Metric>,
208 name: &str,
209 description: &str,
210 time_unix_nano: u64,
211 ) {
212 metrics.push(pb::Metric {
213 name: name.to_string(),
214 description: description.to_string(),
215 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge {
216 data_points: vec![int_data_point(self.get(), Vec::new(), time_unix_nano)],
217 })),
218 ..Default::default()
219 });
220 }
221}
222
223impl OtlpExport for GaugeF64 {
224 fn export_otlp(
225 &self,
226 metrics: &mut Vec<pb::Metric>,
227 name: &str,
228 description: &str,
229 time_unix_nano: u64,
230 ) {
231 metrics.push(pb::Metric {
232 name: name.to_string(),
233 description: description.to_string(),
234 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge {
235 data_points: vec![double_data_point(self.get(), Vec::new(), time_unix_nano)],
236 })),
237 ..Default::default()
238 });
239 }
240}
241
242impl OtlpExport for Histogram {
243 fn export_otlp(
244 &self,
245 metrics: &mut Vec<pb::Metric>,
246 name: &str,
247 description: &str,
248 time_unix_nano: u64,
249 ) {
250 let cumulative = self.buckets_cumulative();
251 let count = self.count();
252 let sum = self.sum();
253 let (bucket_counts, explicit_bounds) = cumulative_to_otlp_buckets(&cumulative);
254
255 metrics.push(pb::Metric {
256 name: name.to_string(),
257 description: description.to_string(),
258 data: Some(pb::metric::Data::Histogram(pb::OtlpHistogram {
259 data_points: vec![pb::HistogramDataPoint {
260 time_unix_nano,
261 count,
262 sum: Some(sum as f64),
263 bucket_counts,
264 explicit_bounds,
265 ..Default::default()
266 }],
267 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
268 })),
269 ..Default::default()
270 });
271 }
272}
273
274fn exp_histogram_data_point(
276 snap: &ExpBucketsSnapshot,
277 attributes: Vec<pb::KeyValue>,
278 time_unix_nano: u64,
279) -> pb::ExponentialHistogramDataPoint {
280 let mut first_nonzero: Option<usize> = None;
282 let mut last_nonzero: Option<usize> = None;
283 for (i, &c) in snap.positive.iter().enumerate() {
284 if c > 0 {
285 if first_nonzero.is_none() {
286 first_nonzero = Some(i);
287 }
288 last_nonzero = Some(i);
289 }
290 }
291
292 let positive = match (first_nonzero, last_nonzero) {
293 (Some(first), Some(last)) => {
294 let bucket_counts: Vec<u64> = snap.positive[first..=last].to_vec();
295 Some(pb::exponential_histogram_data_point::Buckets {
296 offset: first as i32,
297 bucket_counts,
298 })
299 }
300 _ => None,
301 };
302
303 pb::ExponentialHistogramDataPoint {
304 attributes,
305 time_unix_nano,
306 count: snap.count,
307 sum: Some(snap.sum as f64),
308 scale: 0, zero_count: snap.zero_count,
310 positive,
311 negative: None, min: snap.min().map(|v| v as f64),
313 max: snap.max().map(|v| v as f64),
314 ..Default::default()
315 }
316}
317
318impl OtlpExport for Distribution {
319 fn export_otlp(
321 &self,
322 metrics: &mut Vec<pb::Metric>,
323 name: &str,
324 description: &str,
325 time_unix_nano: u64,
326 ) {
327 let snap = self.buckets_snapshot();
328 let dp = exp_histogram_data_point(&snap, Vec::new(), time_unix_nano);
329
330 metrics.push(pb::Metric {
331 name: name.to_string(),
332 description: description.to_string(),
333 data: Some(pb::metric::Data::ExponentialHistogram(
334 pb::OtlpExpHistogram {
335 data_points: vec![dp],
336 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
337 },
338 )),
339 ..Default::default()
340 });
341 }
342}
343
344impl<L: LabelEnum> OtlpExport for LabeledCounter<L> {
349 fn export_otlp(
350 &self,
351 metrics: &mut Vec<pb::Metric>,
352 name: &str,
353 description: &str,
354 time_unix_nano: u64,
355 ) {
356 let data_points: Vec<_> = self
357 .iter()
358 .map(|(label, count)| {
359 int_data_point(
360 count as i64,
361 vec![label_to_attribute(label)],
362 time_unix_nano,
363 )
364 })
365 .collect();
366
367 metrics.push(pb::Metric {
368 name: name.to_string(),
369 description: description.to_string(),
370 data: Some(pb::metric::Data::Sum(pb::Sum {
371 data_points,
372 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
373 is_monotonic: false,
374 })),
375 ..Default::default()
376 });
377 }
378}
379
380impl<L: LabelEnum> OtlpExport for LabeledGauge<L> {
381 fn export_otlp(
382 &self,
383 metrics: &mut Vec<pb::Metric>,
384 name: &str,
385 description: &str,
386 time_unix_nano: u64,
387 ) {
388 let data_points: Vec<_> = self
389 .iter()
390 .map(|(label, value)| {
391 int_data_point(value, vec![label_to_attribute(label)], time_unix_nano)
392 })
393 .collect();
394
395 metrics.push(pb::Metric {
396 name: name.to_string(),
397 description: description.to_string(),
398 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge { data_points })),
399 ..Default::default()
400 });
401 }
402}
403
404impl<L: LabelEnum> OtlpExport for LabeledHistogram<L> {
405 fn export_otlp(
406 &self,
407 metrics: &mut Vec<pb::Metric>,
408 name: &str,
409 description: &str,
410 time_unix_nano: u64,
411 ) {
412 let mut data_points = Vec::new();
413
414 for (label, buckets, sum, count) in self.iter() {
415 let attrs = vec![label_to_attribute(label)];
416 let (bucket_counts, explicit_bounds) = cumulative_to_otlp_buckets(&buckets);
417
418 data_points.push(pb::HistogramDataPoint {
419 attributes: attrs,
420 time_unix_nano,
421 count,
422 sum: Some(sum as f64),
423 bucket_counts,
424 explicit_bounds,
425 ..Default::default()
426 });
427 }
428
429 metrics.push(pb::Metric {
430 name: name.to_string(),
431 description: description.to_string(),
432 data: Some(pb::metric::Data::Histogram(pb::OtlpHistogram {
433 data_points,
434 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
435 })),
436 ..Default::default()
437 });
438 }
439}
440
441impl OtlpExport for DynamicCounter {
446 fn export_otlp(
447 &self,
448 metrics: &mut Vec<pb::Metric>,
449 name: &str,
450 description: &str,
451 time_unix_nano: u64,
452 ) {
453 let mut data_points = Vec::new();
454 self.visit_series(|pairs, count| {
455 data_points.push(int_data_point(
456 count as i64,
457 pairs_to_attributes(pairs),
458 time_unix_nano,
459 ));
460 });
461
462 if data_points.is_empty() {
463 return;
464 }
465
466 metrics.push(pb::Metric {
467 name: name.to_string(),
468 description: description.to_string(),
469 data: Some(pb::metric::Data::Sum(pb::Sum {
470 data_points,
471 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
472 is_monotonic: false,
473 })),
474 ..Default::default()
475 });
476 }
477}
478
479impl OtlpExport for DynamicGauge {
480 fn export_otlp(
481 &self,
482 metrics: &mut Vec<pb::Metric>,
483 name: &str,
484 description: &str,
485 time_unix_nano: u64,
486 ) {
487 let mut data_points = Vec::new();
488 self.visit_series(|pairs, value| {
489 data_points.push(double_data_point(
490 value,
491 pairs_to_attributes(pairs),
492 time_unix_nano,
493 ));
494 });
495
496 if data_points.is_empty() {
497 return;
498 }
499
500 metrics.push(pb::Metric {
501 name: name.to_string(),
502 description: description.to_string(),
503 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge { data_points })),
504 ..Default::default()
505 });
506 }
507}
508
509impl OtlpExport for DynamicGaugeI64 {
510 fn export_otlp(
511 &self,
512 metrics: &mut Vec<pb::Metric>,
513 name: &str,
514 description: &str,
515 time_unix_nano: u64,
516 ) {
517 let mut data_points = Vec::new();
518 self.visit_series(|pairs, value| {
519 data_points.push(int_data_point(
520 value,
521 pairs_to_attributes(pairs),
522 time_unix_nano,
523 ));
524 });
525
526 if data_points.is_empty() {
527 return;
528 }
529
530 metrics.push(pb::Metric {
531 name: name.to_string(),
532 description: description.to_string(),
533 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge { data_points })),
534 ..Default::default()
535 });
536 }
537}
538
539impl OtlpExport for DynamicHistogram {
540 fn export_otlp(
541 &self,
542 metrics: &mut Vec<pb::Metric>,
543 name: &str,
544 description: &str,
545 time_unix_nano: u64,
546 ) {
547 let mut data_points = Vec::new();
548
549 self.visit_series(|pairs, series| {
550 let (bucket_counts, explicit_bounds) =
551 cumulative_to_otlp_buckets_iter(series.buckets_cumulative_iter());
552
553 data_points.push(pb::HistogramDataPoint {
554 attributes: pairs_to_attributes(pairs),
555 time_unix_nano,
556 count: series.count(),
557 sum: Some(series.sum() as f64),
558 bucket_counts,
559 explicit_bounds,
560 ..Default::default()
561 });
562 });
563
564 if data_points.is_empty() {
565 return;
566 }
567
568 metrics.push(pb::Metric {
569 name: name.to_string(),
570 description: description.to_string(),
571 data: Some(pb::metric::Data::Histogram(pb::OtlpHistogram {
572 data_points,
573 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
574 })),
575 ..Default::default()
576 });
577 }
578}
579
580impl OtlpExport for DynamicDistribution {
581 fn export_otlp(
583 &self,
584 metrics: &mut Vec<pb::Metric>,
585 name: &str,
586 description: &str,
587 time_unix_nano: u64,
588 ) {
589 let mut data_points = Vec::new();
590
591 self.visit_series(|pairs, _count, _sum, snap| {
592 let attrs = pairs_to_attributes(pairs);
593 data_points.push(exp_histogram_data_point(&snap, attrs, time_unix_nano));
594 });
595
596 if data_points.is_empty() {
597 return;
598 }
599
600 metrics.push(pb::Metric {
601 name: name.to_string(),
602 description: description.to_string(),
603 data: Some(pb::metric::Data::ExponentialHistogram(
604 pb::OtlpExpHistogram {
605 data_points,
606 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
607 },
608 )),
609 ..Default::default()
610 });
611 }
612}
613
614use crate::span::{CompletedSpan, SpanKind, SpanStatus, SpanValue};
619
620impl CompletedSpan {
621 pub fn to_otlp(&self) -> pb::OtlpSpan {
623 let kind = match self.kind {
624 SpanKind::Internal => pb::OtlpSpanKind::Internal,
625 SpanKind::Server => pb::OtlpSpanKind::Server,
626 SpanKind::Client => pb::OtlpSpanKind::Client,
627 SpanKind::Producer => pb::OtlpSpanKind::Producer,
628 SpanKind::Consumer => pb::OtlpSpanKind::Consumer,
629 };
630
631 let status = match &self.status {
632 SpanStatus::Unset => None,
633 SpanStatus::Ok => Some(pb::OtlpStatus {
634 code: pb::OtlpStatusCode::Ok as i32,
635 message: String::new(),
636 }),
637 SpanStatus::Error { message } => Some(pb::OtlpStatus {
638 code: pb::OtlpStatusCode::Error as i32,
639 message: message.to_string(),
640 }),
641 };
642
643 let attributes: Vec<pb::KeyValue> = self
644 .attributes
645 .iter()
646 .map(|attr| {
647 let value = match &attr.value {
648 SpanValue::String(s) => pb::any_value::Value::StringValue(s.to_string()),
649 SpanValue::I64(v) => pb::any_value::Value::IntValue(*v),
650 SpanValue::F64(v) => pb::any_value::Value::DoubleValue(*v),
651 SpanValue::Bool(v) => pb::any_value::Value::BoolValue(*v),
652 SpanValue::Uuid(u) => pb::any_value::Value::StringValue(u.to_string()),
653 };
654 pb::KeyValue {
655 key: attr.key.to_string(),
656 value: Some(pb::AnyValue { value: Some(value) }),
657 }
658 })
659 .collect();
660
661 let events: Vec<pb::OtlpEvent> = self
662 .events
663 .iter()
664 .map(|evt| {
665 let attrs: Vec<pb::KeyValue> = evt
666 .attributes
667 .iter()
668 .map(|a| {
669 let v = match &a.value {
670 SpanValue::String(s) => {
671 pb::any_value::Value::StringValue(s.to_string())
672 }
673 SpanValue::I64(v) => pb::any_value::Value::IntValue(*v),
674 SpanValue::F64(v) => pb::any_value::Value::DoubleValue(*v),
675 SpanValue::Bool(v) => pb::any_value::Value::BoolValue(*v),
676 SpanValue::Uuid(u) => pb::any_value::Value::StringValue(u.to_string()),
677 };
678 pb::KeyValue {
679 key: a.key.to_string(),
680 value: Some(pb::AnyValue { value: Some(v) }),
681 }
682 })
683 .collect();
684 pb::OtlpEvent {
685 time_unix_nano: evt.time_ns,
686 name: evt.name.to_string(),
687 attributes: attrs,
688 dropped_attributes_count: 0,
689 }
690 })
691 .collect();
692
693 pb::OtlpSpan {
694 trace_id: self.trace_id.as_bytes().to_vec(),
695 span_id: self.span_id.as_bytes().to_vec(),
696 parent_span_id: if self.parent_span_id.is_invalid() {
697 Vec::new()
698 } else {
699 self.parent_span_id.as_bytes().to_vec()
700 },
701 name: self.name.to_string(),
702 kind: kind as i32,
703 start_time_unix_nano: self.start_time_ns,
704 end_time_unix_nano: self.end_time_ns,
705 attributes,
706 events,
707 status,
708 ..Default::default()
709 }
710 }
711}
712
713pub fn build_trace_export_request(
717 resource: &pb::Resource,
718 scope_name: &str,
719 spans: Vec<pb::OtlpSpan>,
720) -> pb::ExportTraceServiceRequest {
721 pb::ExportTraceServiceRequest {
722 resource_spans: vec![pb::ResourceSpans {
723 resource: Some(resource.clone()),
724 scope_spans: vec![pb::ScopeSpans {
725 scope: Some(pb::InstrumentationScope {
726 name: scope_name.to_string(),
727 ..Default::default()
728 }),
729 spans,
730 ..Default::default()
731 }],
732 ..Default::default()
733 }],
734 }
735}
736
737#[cfg(test)]
738mod tests {
739 use super::*;
740
741 fn test_timestamp() -> u64 {
742 1_000_000_000 }
744
745 #[test]
746 fn test_counter_otlp() {
747 let counter = Counter::new(4);
748 counter.add(42);
749
750 let mut metrics = Vec::new();
751 counter.export_otlp(
752 &mut metrics,
753 "test_counter",
754 "A test counter",
755 test_timestamp(),
756 );
757
758 assert_eq!(metrics.len(), 1);
759 assert_eq!(metrics[0].name, "test_counter");
760 assert_eq!(metrics[0].description, "A test counter");
761
762 let data = metrics[0].data.as_ref().expect("missing data");
763 match data {
764 pb::metric::Data::Sum(sum) => {
765 assert!(!sum.is_monotonic);
767 assert_eq!(
768 sum.aggregation_temporality,
769 pb::AggregationTemporality::Cumulative as i32
770 );
771 assert_eq!(sum.data_points.len(), 1);
772 assert_eq!(
773 sum.data_points[0].value,
774 Some(pb::number_data_point::Value::AsInt(42))
775 );
776 assert_eq!(sum.data_points[0].time_unix_nano, test_timestamp());
777 }
778 _ => panic!("expected Sum, got {:?}", data),
779 }
780 }
781
782 #[test]
783 fn test_gauge_otlp() {
784 let gauge = Gauge::new();
785 gauge.set(-10);
786
787 let mut metrics = Vec::new();
788 gauge.export_otlp(&mut metrics, "test_gauge", "A test gauge", test_timestamp());
789
790 assert_eq!(metrics.len(), 1);
791 match metrics[0].data.as_ref().expect("missing data") {
792 pb::metric::Data::Gauge(g) => {
793 assert_eq!(g.data_points.len(), 1);
794 assert_eq!(
795 g.data_points[0].value,
796 Some(pb::number_data_point::Value::AsInt(-10))
797 );
798 }
799 other => panic!("expected Gauge, got {:?}", other),
800 }
801 }
802
803 #[test]
804 fn test_gauge_f64_otlp() {
805 let gauge = GaugeF64::new();
806 gauge.set(3.125);
807
808 let mut metrics = Vec::new();
809 gauge.export_otlp(&mut metrics, "test_gauge_f64", "", test_timestamp());
810
811 match metrics[0].data.as_ref().expect("missing data") {
812 pb::metric::Data::Gauge(g) => {
813 assert_eq!(g.data_points.len(), 1);
814 match g.data_points[0].value {
815 Some(pb::number_data_point::Value::AsDouble(v)) => {
816 assert!((v - 3.125).abs() < 1e-10);
817 }
818 ref other => panic!("expected AsDouble, got {:?}", other),
819 }
820 }
821 other => panic!("expected Gauge, got {:?}", other),
822 }
823 }
824
825 #[test]
826 fn test_histogram_otlp() {
827 let h = Histogram::new(&[10, 100], 4);
828 h.record(5);
829 h.record(50);
830 h.record(500);
831
832 let mut metrics = Vec::new();
833 h.export_otlp(
834 &mut metrics,
835 "test_hist",
836 "A test histogram",
837 test_timestamp(),
838 );
839
840 assert_eq!(metrics.len(), 1);
841 match metrics[0].data.as_ref().expect("missing data") {
842 pb::metric::Data::Histogram(hist) => {
843 assert_eq!(
844 hist.aggregation_temporality,
845 pb::AggregationTemporality::Cumulative as i32
846 );
847 assert_eq!(hist.data_points.len(), 1);
848
849 let dp = &hist.data_points[0];
850 assert_eq!(dp.count, 3);
851 assert_eq!(dp.sum, Some(555.0));
852 assert_eq!(dp.explicit_bounds, vec![10.0, 100.0]);
853 assert_eq!(dp.bucket_counts, vec![1, 1, 1]);
854 assert_eq!(dp.time_unix_nano, test_timestamp());
855 }
856 other => panic!("expected Histogram, got {:?}", other),
857 }
858 }
859
860 #[test]
861 fn test_distribution_otlp() {
862 let dist = Distribution::new(4);
863 dist.record(100);
864 dist.record(200);
865 dist.record(300);
866
867 let mut metrics = Vec::new();
868 dist.export_otlp(
869 &mut metrics,
870 "test_dist",
871 "A distribution",
872 test_timestamp(),
873 );
874
875 assert_eq!(metrics.len(), 1);
876 assert_eq!(metrics[0].name, "test_dist");
877
878 match metrics[0].data.as_ref().expect("missing data") {
879 pb::metric::Data::ExponentialHistogram(hist) => {
880 assert_eq!(
881 hist.aggregation_temporality,
882 pb::AggregationTemporality::Cumulative as i32
883 );
884 assert_eq!(hist.data_points.len(), 1);
885
886 let dp = &hist.data_points[0];
887 assert_eq!(dp.count, 3);
888 assert_eq!(dp.sum, Some(600.0));
889 assert_eq!(dp.scale, 0);
890 assert_eq!(dp.zero_count, 0);
891 assert_eq!(dp.time_unix_nano, test_timestamp());
892 assert!(dp.positive.is_some());
894 let positive = dp.positive.as_ref().expect("positive buckets");
895 assert!(!positive.bucket_counts.is_empty());
897 }
898 other => panic!("expected ExponentialHistogram, got {:?}", other),
899 }
900 }
901
902 #[test]
903 fn test_dynamic_counter_otlp() {
904 let counter = DynamicCounter::new(4);
905 counter.add(&[("env", "prod"), ("region", "us")], 10);
906 counter.add(&[("env", "staging"), ("region", "eu")], 5);
907
908 let mut metrics = Vec::new();
909 counter.export_otlp(&mut metrics, "requests", "Request count", test_timestamp());
910
911 assert_eq!(metrics.len(), 1);
912 match metrics[0].data.as_ref().expect("missing data") {
913 pb::metric::Data::Sum(sum) => {
914 assert!(!sum.is_monotonic);
915 assert_eq!(sum.data_points.len(), 2);
916 for dp in &sum.data_points {
917 assert_eq!(dp.attributes.len(), 2);
918 }
919 }
920 other => panic!("expected Sum, got {:?}", other),
921 }
922 }
923
924 #[test]
925 fn test_build_export_request() {
926 let resource = build_resource("test-service", &[("version", "1.0")]);
927 let counter = Counter::new(4);
928 counter.add(1);
929
930 let mut metrics = Vec::new();
931 counter.export_otlp(&mut metrics, "my_counter", "", test_timestamp());
932
933 let request = build_export_request(&resource, "fast-telemetry", metrics);
934
935 assert_eq!(request.resource_metrics.len(), 1);
936 let rm = &request.resource_metrics[0];
937 let res = rm.resource.as_ref().expect("missing resource");
938 assert_eq!(res.attributes.len(), 2); assert_eq!(res.attributes[0].key, "service.name");
940
941 assert_eq!(rm.scope_metrics.len(), 1);
942 let sm = &rm.scope_metrics[0];
943 let scope = sm.scope.as_ref().expect("missing scope");
944 assert_eq!(scope.name, "fast-telemetry");
945 assert_eq!(sm.metrics.len(), 1);
946 }
947
948 #[test]
949 fn test_make_kv() {
950 let kv = make_kv("foo", "bar");
951 assert_eq!(kv.key, "foo");
952 match kv
953 .value
954 .expect("missing value")
955 .value
956 .expect("missing inner")
957 {
958 pb::any_value::Value::StringValue(s) => assert_eq!(s, "bar"),
959 other => panic!("expected StringValue, got {:?}", other),
960 }
961 }
962
963 #[derive(Copy, Clone, Debug)]
966 enum TestLabel {
967 A,
968 B,
969 C,
970 }
971
972 impl crate::LabelEnum for TestLabel {
973 const CARDINALITY: usize = 3;
974 const LABEL_NAME: &'static str = "test";
975
976 fn as_index(self) -> usize {
977 self as usize
978 }
979 fn from_index(index: usize) -> Self {
980 match index {
981 0 => Self::A,
982 1 => Self::B,
983 _ => Self::C,
984 }
985 }
986 fn variant_name(self) -> &'static str {
987 match self {
988 Self::A => "a",
989 Self::B => "b",
990 Self::C => "c",
991 }
992 }
993 }
994
995 #[test]
996 fn test_labeled_counter_otlp() {
997 let counter = LabeledCounter::<TestLabel>::new(4);
998 counter.add(TestLabel::A, 10);
999 counter.add(TestLabel::B, 20);
1000
1001 let mut metrics = Vec::new();
1002 counter.export_otlp(
1003 &mut metrics,
1004 "labeled_counter",
1005 "By label",
1006 test_timestamp(),
1007 );
1008
1009 assert_eq!(metrics.len(), 1);
1010 match metrics[0].data.as_ref().expect("missing data") {
1011 pb::metric::Data::Sum(sum) => {
1012 assert!(!sum.is_monotonic);
1013 assert_eq!(sum.data_points.len(), 3); let dp_a = sum.data_points.iter().find(|dp| {
1016 dp.attributes.iter().any(|kv| kv.key == "test" && matches!(&kv.value, Some(v) if matches!(&v.value, Some(pb::any_value::Value::StringValue(s)) if s == "a")))
1017 }).expect("missing data point for label A");
1018 assert_eq!(dp_a.value, Some(pb::number_data_point::Value::AsInt(10)));
1019 }
1020 other => panic!("expected Sum, got {:?}", other),
1021 }
1022 }
1023
1024 #[test]
1025 fn test_labeled_gauge_otlp() {
1026 let gauge = LabeledGauge::<TestLabel>::new();
1027 gauge.set(TestLabel::A, 42);
1028 gauge.set(TestLabel::C, -5);
1029
1030 let mut metrics = Vec::new();
1031 gauge.export_otlp(&mut metrics, "labeled_gauge", "By label", test_timestamp());
1032
1033 assert_eq!(metrics.len(), 1);
1034 match metrics[0].data.as_ref().expect("missing data") {
1035 pb::metric::Data::Gauge(g) => {
1036 assert_eq!(g.data_points.len(), 3);
1037 }
1038 other => panic!("expected Gauge, got {:?}", other),
1039 }
1040 }
1041
1042 #[test]
1043 fn test_labeled_histogram_otlp() {
1044 let h = LabeledHistogram::<TestLabel>::new(&[10, 100], 4);
1045 h.record(TestLabel::A, 5);
1046 h.record(TestLabel::A, 50);
1047 h.record(TestLabel::B, 500);
1048
1049 let mut metrics = Vec::new();
1050 h.export_otlp(&mut metrics, "labeled_hist", "By label", test_timestamp());
1051
1052 assert_eq!(metrics.len(), 1);
1053 match metrics[0].data.as_ref().expect("missing data") {
1054 pb::metric::Data::Histogram(hist) => {
1055 assert_eq!(
1056 hist.aggregation_temporality,
1057 pb::AggregationTemporality::Cumulative as i32
1058 );
1059 assert_eq!(hist.data_points.len(), 3); for dp in &hist.data_points {
1062 assert_eq!(dp.attributes.len(), 1);
1063 assert_eq!(dp.attributes[0].key, "test");
1064 assert_eq!(dp.time_unix_nano, test_timestamp());
1065 }
1066 }
1067 other => panic!("expected Histogram, got {:?}", other),
1068 }
1069 }
1070
1071 #[test]
1074 fn test_dynamic_gauge_otlp() {
1075 let gauge = DynamicGauge::new(4);
1076 gauge.set(&[("host", "node1")], 3.125);
1077 gauge.set(&[("host", "node2")], 2.72);
1078
1079 let mut metrics = Vec::new();
1080 gauge.export_otlp(
1081 &mut metrics,
1082 "cpu_usage",
1083 "CPU percentage",
1084 test_timestamp(),
1085 );
1086
1087 assert_eq!(metrics.len(), 1);
1088 match metrics[0].data.as_ref().expect("missing data") {
1089 pb::metric::Data::Gauge(g) => {
1090 assert_eq!(g.data_points.len(), 2);
1091 for dp in &g.data_points {
1092 assert_eq!(dp.attributes.len(), 1);
1093 assert!(matches!(
1094 dp.value,
1095 Some(pb::number_data_point::Value::AsDouble(_))
1096 ));
1097 }
1098 }
1099 other => panic!("expected Gauge, got {:?}", other),
1100 }
1101 }
1102
1103 #[test]
1104 fn test_dynamic_gauge_i64_otlp() {
1105 let gauge = DynamicGaugeI64::new(4);
1106 gauge.set(&[("region", "us")], 100);
1107 gauge.set(&[("region", "eu")], 200);
1108
1109 let mut metrics = Vec::new();
1110 gauge.export_otlp(
1111 &mut metrics,
1112 "connections",
1113 "Active connections",
1114 test_timestamp(),
1115 );
1116
1117 assert_eq!(metrics.len(), 1);
1118 match metrics[0].data.as_ref().expect("missing data") {
1119 pb::metric::Data::Gauge(g) => {
1120 assert_eq!(g.data_points.len(), 2);
1121 for dp in &g.data_points {
1122 assert_eq!(dp.attributes.len(), 1);
1123 assert!(matches!(
1124 dp.value,
1125 Some(pb::number_data_point::Value::AsInt(_))
1126 ));
1127 }
1128 }
1129 other => panic!("expected Gauge, got {:?}", other),
1130 }
1131 }
1132
1133 #[test]
1134 fn test_dynamic_histogram_otlp() {
1135 let h = DynamicHistogram::new(&[10, 100, 1000], 4);
1136 h.record(&[("endpoint", "/api")], 5);
1137 h.record(&[("endpoint", "/api")], 50);
1138 h.record(&[("endpoint", "/health")], 500);
1139
1140 let mut metrics = Vec::new();
1141 h.export_otlp(&mut metrics, "latency", "Request latency", test_timestamp());
1142
1143 assert_eq!(metrics.len(), 1);
1144 match metrics[0].data.as_ref().expect("missing data") {
1145 pb::metric::Data::Histogram(hist) => {
1146 assert_eq!(
1147 hist.aggregation_temporality,
1148 pb::AggregationTemporality::Cumulative as i32
1149 );
1150 assert_eq!(hist.data_points.len(), 2); for dp in &hist.data_points {
1152 assert_eq!(dp.attributes.len(), 1);
1153 assert_eq!(dp.attributes[0].key, "endpoint");
1154 assert_eq!(dp.time_unix_nano, test_timestamp());
1155 assert_eq!(dp.explicit_bounds, vec![10.0, 100.0, 1000.0]);
1157 }
1158 }
1159 other => panic!("expected Histogram, got {:?}", other),
1160 }
1161 }
1162
1163 #[test]
1164 fn test_dynamic_distribution_otlp() {
1165 let dist = DynamicDistribution::new(4);
1166 dist.record(&[("method", "GET")], 100);
1167 dist.record(&[("method", "GET")], 200);
1168 dist.record(&[("method", "POST")], 300);
1169
1170 let mut metrics = Vec::new();
1171 dist.export_otlp(
1172 &mut metrics,
1173 "response_size",
1174 "Size in bytes",
1175 test_timestamp(),
1176 );
1177
1178 assert_eq!(metrics.len(), 1);
1179 assert_eq!(metrics[0].name, "response_size");
1180
1181 match metrics[0].data.as_ref().expect("missing data") {
1182 pb::metric::Data::ExponentialHistogram(hist) => {
1183 assert_eq!(
1184 hist.aggregation_temporality,
1185 pb::AggregationTemporality::Cumulative as i32
1186 );
1187 assert_eq!(hist.data_points.len(), 2); for dp in &hist.data_points {
1189 assert_eq!(dp.attributes.len(), 1);
1190 assert_eq!(dp.attributes[0].key, "method");
1191 assert_eq!(dp.scale, 0);
1192 assert!(dp.positive.is_some());
1193 }
1194 }
1195 other => panic!("expected ExponentialHistogram, got {:?}", other),
1196 }
1197 }
1198
1199 #[test]
1200 fn test_empty_dynamic_metrics_produce_nothing() {
1201 let counter = DynamicCounter::new(4);
1202 let gauge = DynamicGauge::new(4);
1203 let gauge_i64 = DynamicGaugeI64::new(4);
1204 let hist = DynamicHistogram::new(&[10], 4);
1205 let dist = DynamicDistribution::new(4);
1206
1207 let mut metrics = Vec::new();
1208 let ts = test_timestamp();
1209 counter.export_otlp(&mut metrics, "c", "", ts);
1210 gauge.export_otlp(&mut metrics, "g", "", ts);
1211 gauge_i64.export_otlp(&mut metrics, "gi", "", ts);
1212 hist.export_otlp(&mut metrics, "h", "", ts);
1213 dist.export_otlp(&mut metrics, "d", "", ts);
1214
1215 assert!(
1216 metrics.is_empty(),
1217 "empty dynamic metrics should produce no output"
1218 );
1219 }
1220
1221 #[test]
1222 fn test_cumulative_to_otlp_buckets_helper() {
1223 let cumulative = vec![(10, 1), (100, 3), (u64::MAX, 5)];
1226 let (counts, bounds) = cumulative_to_otlp_buckets(&cumulative);
1227 assert_eq!(counts, vec![1, 2, 2]);
1228 assert_eq!(bounds, vec![10.0, 100.0]);
1229 }
1230
1231 #[test]
1234 fn test_completed_span_to_otlp() {
1235 use crate::span::{SpanAttribute, SpanEvent, SpanKind, SpanStatus};
1236 use crate::span::{SpanId, TraceId};
1237
1238 let completed = CompletedSpan {
1239 trace_id: TraceId::from_hex("4bf92f3577b34da6a3ce929d0e0e4736").unwrap(),
1240 span_id: SpanId::from_hex("00f067aa0ba902b7").unwrap(),
1241 parent_span_id: SpanId::from_hex("1234567890abcdef").unwrap(),
1242 name: "test_operation".into(),
1243 kind: SpanKind::Server,
1244 start_time_ns: 1_000_000_000,
1245 end_time_ns: 2_000_000_000,
1246 status: SpanStatus::Ok,
1247 attributes: vec![
1248 SpanAttribute::new("http.method", "GET"),
1249 SpanAttribute::new("http.status_code", 200i64),
1250 ],
1251 events: vec![SpanEvent {
1252 name: "processing".into(),
1253 time_ns: 1_500_000_000,
1254 attributes: vec![SpanAttribute::new("step", "validate")],
1255 }],
1256 };
1257
1258 let otlp = completed.to_otlp();
1259
1260 assert_eq!(
1261 otlp.trace_id,
1262 &[
1263 0x4b, 0xf9, 0x2f, 0x35, 0x77, 0xb3, 0x4d, 0xa6, 0xa3, 0xce, 0x92, 0x9d, 0x0e, 0x0e,
1264 0x47, 0x36
1265 ]
1266 );
1267 assert_eq!(
1268 otlp.span_id,
1269 &[0x00, 0xf0, 0x67, 0xaa, 0x0b, 0xa9, 0x02, 0xb7]
1270 );
1271 assert_eq!(
1272 otlp.parent_span_id,
1273 &[0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef]
1274 );
1275 assert_eq!(otlp.name, "test_operation");
1276 assert_eq!(otlp.kind, pb::OtlpSpanKind::Server as i32);
1277 assert_eq!(otlp.start_time_unix_nano, 1_000_000_000);
1278 assert_eq!(otlp.end_time_unix_nano, 2_000_000_000);
1279
1280 let status = otlp.status.unwrap();
1282 assert_eq!(status.code, pb::OtlpStatusCode::Ok as i32);
1283
1284 assert_eq!(otlp.attributes.len(), 2);
1286 assert_eq!(otlp.attributes[0].key, "http.method");
1287 assert_eq!(otlp.attributes[1].key, "http.status_code");
1288
1289 assert_eq!(otlp.events.len(), 1);
1291 assert_eq!(otlp.events[0].name, "processing");
1292 assert_eq!(otlp.events[0].time_unix_nano, 1_500_000_000);
1293 assert_eq!(otlp.events[0].attributes.len(), 1);
1294 }
1295
1296 #[test]
1297 fn test_completed_span_root_has_empty_parent() {
1298 use crate::span::{SpanId, TraceId};
1299
1300 let completed = CompletedSpan {
1301 trace_id: TraceId::random(),
1302 span_id: SpanId::random(),
1303 parent_span_id: SpanId::INVALID,
1304 name: "root".into(),
1305 kind: SpanKind::Server,
1306 start_time_ns: 1_000_000_000,
1307 end_time_ns: 2_000_000_000,
1308 status: SpanStatus::Unset,
1309 attributes: Vec::new(),
1310 events: Vec::new(),
1311 };
1312
1313 let otlp = completed.to_otlp();
1314 assert!(
1315 otlp.parent_span_id.is_empty(),
1316 "root span should have empty parent_span_id"
1317 );
1318 assert!(otlp.status.is_none(), "Unset status should map to None");
1319 }
1320
1321 #[test]
1322 fn test_completed_span_error_status() {
1323 use crate::span::{SpanId, TraceId};
1324
1325 let completed = CompletedSpan {
1326 trace_id: TraceId::random(),
1327 span_id: SpanId::random(),
1328 parent_span_id: SpanId::INVALID,
1329 name: "failing_op".into(),
1330 kind: SpanKind::Internal,
1331 start_time_ns: 1_000_000_000,
1332 end_time_ns: 2_000_000_000,
1333 status: SpanStatus::Error {
1334 message: "connection refused".into(),
1335 },
1336 attributes: Vec::new(),
1337 events: Vec::new(),
1338 };
1339
1340 let otlp = completed.to_otlp();
1341 let status = otlp.status.unwrap();
1342 assert_eq!(status.code, pb::OtlpStatusCode::Error as i32);
1343 assert_eq!(status.message, "connection refused");
1344 }
1345
1346 #[test]
1347 fn test_build_trace_export_request() {
1348 use crate::span::{SpanId, TraceId};
1349
1350 let resource = build_resource("test-service", &[("version", "1.0")]);
1351 let completed = CompletedSpan {
1352 trace_id: TraceId::random(),
1353 span_id: SpanId::random(),
1354 parent_span_id: SpanId::INVALID,
1355 name: "test".into(),
1356 kind: SpanKind::Server,
1357 start_time_ns: 1_000_000_000,
1358 end_time_ns: 2_000_000_000,
1359 status: SpanStatus::Ok,
1360 attributes: Vec::new(),
1361 events: Vec::new(),
1362 };
1363
1364 let otlp_span = completed.to_otlp();
1365 let request = build_trace_export_request(&resource, "fast-telemetry", vec![otlp_span]);
1366
1367 assert_eq!(request.resource_spans.len(), 1);
1368 let rs = &request.resource_spans[0];
1369 let res = rs.resource.as_ref().unwrap();
1370 assert_eq!(res.attributes.len(), 2); assert_eq!(rs.scope_spans.len(), 1);
1373 let ss = &rs.scope_spans[0];
1374 let scope = ss.scope.as_ref().unwrap();
1375 assert_eq!(scope.name, "fast-telemetry");
1376 assert_eq!(ss.spans.len(), 1);
1377 }
1378}