1use crate::exp_buckets::ExpBucketsSnapshot;
10use crate::{
11 Counter, Distribution, DynamicCounter, DynamicDistribution, DynamicGauge, DynamicGaugeI64,
12 DynamicHistogram, Gauge, GaugeF64, Histogram, LabelEnum, LabeledCounter, LabeledGauge,
13 LabeledHistogram, LabeledSampledTimer, MaxGauge, MaxGaugeF64, MinGauge, MinGaugeF64,
14 SampledTimer,
15};
16
17pub mod pb {
19 pub use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
20 pub use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
21 pub use opentelemetry_proto::tonic::common::v1::{
22 AnyValue, InstrumentationScope, KeyValue, any_value,
23 };
24 pub use opentelemetry_proto::tonic::metrics::v1::{
25 self, AggregationTemporality, ExponentialHistogram as OtlpExpHistogram,
26 ExponentialHistogramDataPoint, Gauge as OtlpGauge, Histogram as OtlpHistogram,
27 HistogramDataPoint, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum,
28 exponential_histogram_data_point, metric, number_data_point,
29 };
30 pub use opentelemetry_proto::tonic::resource::v1::Resource;
31 pub use opentelemetry_proto::tonic::trace::v1::{
32 ResourceSpans, ScopeSpans, Span as OtlpSpan, Status as OtlpStatus,
33 span::{Event as OtlpEvent, SpanKind as OtlpSpanKind},
34 status::StatusCode as OtlpStatusCode,
35 };
36}
37
38pub trait OtlpExport {
46 fn export_otlp(
47 &self,
48 metrics: &mut Vec<pb::Metric>,
49 name: &str,
50 description: &str,
51 time_unix_nano: u64,
52 );
53}
54
55fn make_kv(key: &str, value: &str) -> pb::KeyValue {
60 pb::KeyValue {
61 key: key.to_string(),
62 value: Some(pb::AnyValue {
63 value: Some(pb::any_value::Value::StringValue(value.to_string())),
64 }),
65 }
66}
67
68fn pairs_to_attributes(pairs: &[(String, String)]) -> Vec<pb::KeyValue> {
69 pairs.iter().map(|(k, v)| make_kv(k, v)).collect()
70}
71
72fn label_to_attribute<L: LabelEnum>(label: L) -> pb::KeyValue {
73 make_kv(L::LABEL_NAME, label.variant_name())
74}
75
76pub fn now_nanos() -> u64 {
80 std::time::SystemTime::now()
81 .duration_since(std::time::UNIX_EPOCH)
82 .unwrap_or_default()
83 .as_nanos() as u64
84}
85
86fn int_data_point(
87 value: i64,
88 attributes: Vec<pb::KeyValue>,
89 time_unix_nano: u64,
90) -> pb::NumberDataPoint {
91 pb::NumberDataPoint {
92 attributes,
93 time_unix_nano,
94 value: Some(pb::number_data_point::Value::AsInt(value)),
95 ..Default::default()
96 }
97}
98
99fn double_data_point(
100 value: f64,
101 attributes: Vec<pb::KeyValue>,
102 time_unix_nano: u64,
103) -> pb::NumberDataPoint {
104 pb::NumberDataPoint {
105 attributes,
106 time_unix_nano,
107 value: Some(pb::number_data_point::Value::AsDouble(value)),
108 ..Default::default()
109 }
110}
111
112fn cumulative_to_otlp_buckets_iter(
113 cumulative: impl IntoIterator<Item = (u64, u64)>,
114) -> (Vec<u64>, Vec<f64>) {
115 let iter = cumulative.into_iter();
116 let (lower, _) = iter.size_hint();
117 let mut bucket_counts = Vec::with_capacity(lower);
118 let mut explicit_bounds = Vec::with_capacity(lower.saturating_sub(1));
119 let mut prev = 0u64;
120
121 for (bound, cum_count) in iter {
122 bucket_counts.push(cum_count.saturating_sub(prev));
123 prev = cum_count;
124 if bound != u64::MAX {
125 explicit_bounds.push(bound as f64);
126 }
127 }
128
129 (bucket_counts, explicit_bounds)
130}
131
132pub fn build_resource(service_name: &str, attrs: &[(&str, &str)]) -> pb::Resource {
134 let mut attributes = vec![make_kv("service.name", service_name)];
135 for (k, v) in attrs {
136 attributes.push(make_kv(k, v));
137 }
138 pb::Resource {
139 attributes,
140 ..Default::default()
141 }
142}
143
144pub fn build_export_request(
148 resource: &pb::Resource,
149 scope_name: &str,
150 metrics: Vec<pb::Metric>,
151) -> pb::ExportMetricsServiceRequest {
152 pb::ExportMetricsServiceRequest {
153 resource_metrics: vec![pb::ResourceMetrics {
154 resource: Some(resource.clone()),
155 scope_metrics: vec![pb::ScopeMetrics {
156 scope: Some(pb::InstrumentationScope {
157 name: scope_name.to_string(),
158 ..Default::default()
159 }),
160 metrics,
161 ..Default::default()
162 }],
163 ..Default::default()
164 }],
165 }
166}
167
168impl OtlpExport for Counter {
173 fn export_otlp(
174 &self,
175 metrics: &mut Vec<pb::Metric>,
176 name: &str,
177 description: &str,
178 time_unix_nano: u64,
179 ) {
180 let value = self.sum() as i64;
181 metrics.push(pb::Metric {
182 name: name.to_string(),
183 description: description.to_string(),
184 data: Some(pb::metric::Data::Sum(pb::Sum {
185 data_points: vec![int_data_point(value, Vec::new(), time_unix_nano)],
188 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
189 is_monotonic: false,
190 })),
191 ..Default::default()
192 });
193 }
194}
195
196impl OtlpExport for Gauge {
197 fn export_otlp(
198 &self,
199 metrics: &mut Vec<pb::Metric>,
200 name: &str,
201 description: &str,
202 time_unix_nano: u64,
203 ) {
204 metrics.push(pb::Metric {
205 name: name.to_string(),
206 description: description.to_string(),
207 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge {
208 data_points: vec![int_data_point(self.get(), Vec::new(), time_unix_nano)],
209 })),
210 ..Default::default()
211 });
212 }
213}
214
215impl OtlpExport for GaugeF64 {
216 fn export_otlp(
217 &self,
218 metrics: &mut Vec<pb::Metric>,
219 name: &str,
220 description: &str,
221 time_unix_nano: u64,
222 ) {
223 metrics.push(pb::Metric {
224 name: name.to_string(),
225 description: description.to_string(),
226 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge {
227 data_points: vec![double_data_point(self.get(), Vec::new(), time_unix_nano)],
228 })),
229 ..Default::default()
230 });
231 }
232}
233
234impl OtlpExport for MaxGauge {
235 fn export_otlp(
236 &self,
237 metrics: &mut Vec<pb::Metric>,
238 name: &str,
239 description: &str,
240 time_unix_nano: u64,
241 ) {
242 metrics.push(pb::Metric {
243 name: name.to_string(),
244 description: description.to_string(),
245 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge {
246 data_points: vec![int_data_point(self.get(), Vec::new(), time_unix_nano)],
247 })),
248 ..Default::default()
249 });
250 }
251}
252
253impl OtlpExport for MaxGaugeF64 {
254 fn export_otlp(
255 &self,
256 metrics: &mut Vec<pb::Metric>,
257 name: &str,
258 description: &str,
259 time_unix_nano: u64,
260 ) {
261 metrics.push(pb::Metric {
262 name: name.to_string(),
263 description: description.to_string(),
264 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge {
265 data_points: vec![double_data_point(self.get(), Vec::new(), time_unix_nano)],
266 })),
267 ..Default::default()
268 });
269 }
270}
271
272impl OtlpExport for MinGauge {
273 fn export_otlp(
274 &self,
275 metrics: &mut Vec<pb::Metric>,
276 name: &str,
277 description: &str,
278 time_unix_nano: u64,
279 ) {
280 metrics.push(pb::Metric {
281 name: name.to_string(),
282 description: description.to_string(),
283 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge {
284 data_points: vec![int_data_point(self.get(), Vec::new(), time_unix_nano)],
285 })),
286 ..Default::default()
287 });
288 }
289}
290
291impl OtlpExport for MinGaugeF64 {
292 fn export_otlp(
293 &self,
294 metrics: &mut Vec<pb::Metric>,
295 name: &str,
296 description: &str,
297 time_unix_nano: u64,
298 ) {
299 metrics.push(pb::Metric {
300 name: name.to_string(),
301 description: description.to_string(),
302 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge {
303 data_points: vec![double_data_point(self.get(), Vec::new(), time_unix_nano)],
304 })),
305 ..Default::default()
306 });
307 }
308}
309
310impl OtlpExport for Histogram {
311 fn export_otlp(
312 &self,
313 metrics: &mut Vec<pb::Metric>,
314 name: &str,
315 description: &str,
316 time_unix_nano: u64,
317 ) {
318 let count = self.count();
319 let sum = self.sum();
320 let (bucket_counts, explicit_bounds) =
321 cumulative_to_otlp_buckets_iter(self.buckets_cumulative_iter());
322
323 metrics.push(pb::Metric {
324 name: name.to_string(),
325 description: description.to_string(),
326 data: Some(pb::metric::Data::Histogram(pb::OtlpHistogram {
327 data_points: vec![pb::HistogramDataPoint {
328 time_unix_nano,
329 count,
330 sum: Some(sum as f64),
331 bucket_counts,
332 explicit_bounds,
333 ..Default::default()
334 }],
335 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
336 })),
337 ..Default::default()
338 });
339 }
340}
341
342impl OtlpExport for SampledTimer {
343 fn export_otlp(
344 &self,
345 metrics: &mut Vec<pb::Metric>,
346 name: &str,
347 description: &str,
348 time_unix_nano: u64,
349 ) {
350 let calls_name = format!("{name}.calls");
351 let samples_name = format!("{name}.samples");
352 let calls_description = format!("{description} total calls");
353 let samples_description = format!("{description} sampled latency in nanoseconds");
354 self.calls_metric()
355 .export_otlp(metrics, &calls_name, &calls_description, time_unix_nano);
356 self.histogram()
357 .export_otlp(metrics, &samples_name, &samples_description, time_unix_nano);
358 }
359}
360
361fn exp_histogram_data_point(
363 snap: &ExpBucketsSnapshot,
364 attributes: Vec<pb::KeyValue>,
365 time_unix_nano: u64,
366) -> pb::ExponentialHistogramDataPoint {
367 let mut first_nonzero: Option<usize> = None;
369 let mut last_nonzero: Option<usize> = None;
370 for (i, &c) in snap.positive.iter().enumerate() {
371 if c > 0 {
372 if first_nonzero.is_none() {
373 first_nonzero = Some(i);
374 }
375 last_nonzero = Some(i);
376 }
377 }
378
379 let positive = match (first_nonzero, last_nonzero) {
380 (Some(first), Some(last)) => {
381 let bucket_counts: Vec<u64> = snap.positive[first..=last].to_vec();
382 Some(pb::exponential_histogram_data_point::Buckets {
383 offset: first as i32,
384 bucket_counts,
385 })
386 }
387 _ => None,
388 };
389
390 pb::ExponentialHistogramDataPoint {
391 attributes,
392 time_unix_nano,
393 count: snap.count,
394 sum: Some(snap.sum as f64),
395 scale: 0, zero_count: snap.zero_count,
397 positive,
398 negative: None, min: snap.min().map(|v| v as f64),
400 max: snap.max().map(|v| v as f64),
401 ..Default::default()
402 }
403}
404
405impl OtlpExport for Distribution {
406 fn export_otlp(
408 &self,
409 metrics: &mut Vec<pb::Metric>,
410 name: &str,
411 description: &str,
412 time_unix_nano: u64,
413 ) {
414 let snap = self.buckets_snapshot();
415 let dp = exp_histogram_data_point(&snap, Vec::new(), time_unix_nano);
416
417 metrics.push(pb::Metric {
418 name: name.to_string(),
419 description: description.to_string(),
420 data: Some(pb::metric::Data::ExponentialHistogram(
421 pb::OtlpExpHistogram {
422 data_points: vec![dp],
423 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
424 },
425 )),
426 ..Default::default()
427 });
428 }
429}
430
431impl<L: LabelEnum> OtlpExport for LabeledCounter<L> {
436 fn export_otlp(
437 &self,
438 metrics: &mut Vec<pb::Metric>,
439 name: &str,
440 description: &str,
441 time_unix_nano: u64,
442 ) {
443 let data_points: Vec<_> = self
444 .iter()
445 .map(|(label, count)| {
446 int_data_point(
447 count as i64,
448 vec![label_to_attribute(label)],
449 time_unix_nano,
450 )
451 })
452 .collect();
453
454 metrics.push(pb::Metric {
455 name: name.to_string(),
456 description: description.to_string(),
457 data: Some(pb::metric::Data::Sum(pb::Sum {
458 data_points,
459 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
460 is_monotonic: false,
461 })),
462 ..Default::default()
463 });
464 }
465}
466
467impl<L: LabelEnum> OtlpExport for LabeledGauge<L> {
468 fn export_otlp(
469 &self,
470 metrics: &mut Vec<pb::Metric>,
471 name: &str,
472 description: &str,
473 time_unix_nano: u64,
474 ) {
475 let data_points: Vec<_> = self
476 .iter()
477 .map(|(label, value)| {
478 int_data_point(value, vec![label_to_attribute(label)], time_unix_nano)
479 })
480 .collect();
481
482 metrics.push(pb::Metric {
483 name: name.to_string(),
484 description: description.to_string(),
485 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge { data_points })),
486 ..Default::default()
487 });
488 }
489}
490
491impl<L: LabelEnum> OtlpExport for LabeledHistogram<L> {
492 fn export_otlp(
493 &self,
494 metrics: &mut Vec<pb::Metric>,
495 name: &str,
496 description: &str,
497 time_unix_nano: u64,
498 ) {
499 let mut data_points = Vec::new();
500
501 for (label, histogram) in self.iter() {
502 let attrs = vec![label_to_attribute(label)];
503 let (bucket_counts, explicit_bounds) =
504 cumulative_to_otlp_buckets_iter(histogram.buckets_cumulative_iter());
505
506 data_points.push(pb::HistogramDataPoint {
507 attributes: attrs,
508 time_unix_nano,
509 count: histogram.count(),
510 sum: Some(histogram.sum() as f64),
511 bucket_counts,
512 explicit_bounds,
513 ..Default::default()
514 });
515 }
516
517 metrics.push(pb::Metric {
518 name: name.to_string(),
519 description: description.to_string(),
520 data: Some(pb::metric::Data::Histogram(pb::OtlpHistogram {
521 data_points,
522 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
523 })),
524 ..Default::default()
525 });
526 }
527}
528
529impl<L: LabelEnum> OtlpExport for LabeledSampledTimer<L> {
530 fn export_otlp(
531 &self,
532 metrics: &mut Vec<pb::Metric>,
533 name: &str,
534 description: &str,
535 time_unix_nano: u64,
536 ) {
537 let calls_name = format!("{name}.calls");
538 let samples_name = format!("{name}.samples");
539 let calls_description = format!("{description} total calls");
540 let samples_description = format!("{description} sampled latency in nanoseconds");
541
542 let mut call_points = Vec::new();
543 let mut sample_points = Vec::new();
544
545 for (label, calls, histogram) in self.iter() {
546 call_points.push(int_data_point(
547 calls.sum() as i64,
548 vec![label_to_attribute(label)],
549 time_unix_nano,
550 ));
551
552 let (bucket_counts, explicit_bounds) =
553 cumulative_to_otlp_buckets_iter(histogram.buckets_cumulative_iter());
554 sample_points.push(pb::HistogramDataPoint {
555 attributes: vec![label_to_attribute(label)],
556 time_unix_nano,
557 count: histogram.count(),
558 sum: Some(histogram.sum() as f64),
559 bucket_counts,
560 explicit_bounds,
561 ..Default::default()
562 });
563 }
564
565 metrics.push(pb::Metric {
566 name: calls_name,
567 description: calls_description,
568 data: Some(pb::metric::Data::Sum(pb::Sum {
569 data_points: call_points,
570 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
571 is_monotonic: false,
572 })),
573 ..Default::default()
574 });
575
576 metrics.push(pb::Metric {
577 name: samples_name,
578 description: samples_description,
579 data: Some(pb::metric::Data::Histogram(pb::OtlpHistogram {
580 data_points: sample_points,
581 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
582 })),
583 ..Default::default()
584 });
585 }
586}
587
588impl OtlpExport for DynamicCounter {
593 fn export_otlp(
594 &self,
595 metrics: &mut Vec<pb::Metric>,
596 name: &str,
597 description: &str,
598 time_unix_nano: u64,
599 ) {
600 let mut data_points = Vec::new();
601 self.visit_series(|pairs, count| {
602 data_points.push(int_data_point(
603 count as i64,
604 pairs_to_attributes(pairs),
605 time_unix_nano,
606 ));
607 });
608
609 if data_points.is_empty() {
610 return;
611 }
612
613 metrics.push(pb::Metric {
614 name: name.to_string(),
615 description: description.to_string(),
616 data: Some(pb::metric::Data::Sum(pb::Sum {
617 data_points,
618 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
619 is_monotonic: false,
620 })),
621 ..Default::default()
622 });
623 }
624}
625
626impl OtlpExport for DynamicGauge {
627 fn export_otlp(
628 &self,
629 metrics: &mut Vec<pb::Metric>,
630 name: &str,
631 description: &str,
632 time_unix_nano: u64,
633 ) {
634 let mut data_points = Vec::new();
635 self.visit_series(|pairs, value| {
636 data_points.push(double_data_point(
637 value,
638 pairs_to_attributes(pairs),
639 time_unix_nano,
640 ));
641 });
642
643 if data_points.is_empty() {
644 return;
645 }
646
647 metrics.push(pb::Metric {
648 name: name.to_string(),
649 description: description.to_string(),
650 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge { data_points })),
651 ..Default::default()
652 });
653 }
654}
655
656impl OtlpExport for DynamicGaugeI64 {
657 fn export_otlp(
658 &self,
659 metrics: &mut Vec<pb::Metric>,
660 name: &str,
661 description: &str,
662 time_unix_nano: u64,
663 ) {
664 let mut data_points = Vec::new();
665 self.visit_series(|pairs, value| {
666 data_points.push(int_data_point(
667 value,
668 pairs_to_attributes(pairs),
669 time_unix_nano,
670 ));
671 });
672
673 if data_points.is_empty() {
674 return;
675 }
676
677 metrics.push(pb::Metric {
678 name: name.to_string(),
679 description: description.to_string(),
680 data: Some(pb::metric::Data::Gauge(pb::OtlpGauge { data_points })),
681 ..Default::default()
682 });
683 }
684}
685
686impl OtlpExport for DynamicHistogram {
687 fn export_otlp(
688 &self,
689 metrics: &mut Vec<pb::Metric>,
690 name: &str,
691 description: &str,
692 time_unix_nano: u64,
693 ) {
694 let mut data_points = Vec::new();
695
696 self.visit_series(|pairs, series| {
697 let (bucket_counts, explicit_bounds) =
698 cumulative_to_otlp_buckets_iter(series.buckets_cumulative_iter());
699
700 data_points.push(pb::HistogramDataPoint {
701 attributes: pairs_to_attributes(pairs),
702 time_unix_nano,
703 count: series.count(),
704 sum: Some(series.sum() as f64),
705 bucket_counts,
706 explicit_bounds,
707 ..Default::default()
708 });
709 });
710
711 if data_points.is_empty() {
712 return;
713 }
714
715 metrics.push(pb::Metric {
716 name: name.to_string(),
717 description: description.to_string(),
718 data: Some(pb::metric::Data::Histogram(pb::OtlpHistogram {
719 data_points,
720 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
721 })),
722 ..Default::default()
723 });
724 }
725}
726
727impl OtlpExport for DynamicDistribution {
728 fn export_otlp(
730 &self,
731 metrics: &mut Vec<pb::Metric>,
732 name: &str,
733 description: &str,
734 time_unix_nano: u64,
735 ) {
736 let mut data_points = Vec::new();
737
738 self.visit_series(|pairs, _count, _sum, snap| {
739 let attrs = pairs_to_attributes(pairs);
740 data_points.push(exp_histogram_data_point(&snap, attrs, time_unix_nano));
741 });
742
743 if data_points.is_empty() {
744 return;
745 }
746
747 metrics.push(pb::Metric {
748 name: name.to_string(),
749 description: description.to_string(),
750 data: Some(pb::metric::Data::ExponentialHistogram(
751 pb::OtlpExpHistogram {
752 data_points,
753 aggregation_temporality: pb::AggregationTemporality::Cumulative as i32,
754 },
755 )),
756 ..Default::default()
757 });
758 }
759}
760
761use crate::span::{CompletedSpan, SpanKind, SpanStatus, SpanValue};
766
767impl CompletedSpan {
768 pub fn to_otlp(&self) -> pb::OtlpSpan {
770 let kind = match self.kind {
771 SpanKind::Internal => pb::OtlpSpanKind::Internal,
772 SpanKind::Server => pb::OtlpSpanKind::Server,
773 SpanKind::Client => pb::OtlpSpanKind::Client,
774 SpanKind::Producer => pb::OtlpSpanKind::Producer,
775 SpanKind::Consumer => pb::OtlpSpanKind::Consumer,
776 };
777
778 let status = match &self.status {
779 SpanStatus::Unset => None,
780 SpanStatus::Ok => Some(pb::OtlpStatus {
781 code: pb::OtlpStatusCode::Ok as i32,
782 message: String::new(),
783 }),
784 SpanStatus::Error { message } => Some(pb::OtlpStatus {
785 code: pb::OtlpStatusCode::Error as i32,
786 message: message.to_string(),
787 }),
788 };
789
790 let attributes: Vec<pb::KeyValue> = self
791 .attributes
792 .iter()
793 .map(|attr| {
794 let value = match &attr.value {
795 SpanValue::String(s) => pb::any_value::Value::StringValue(s.to_string()),
796 SpanValue::I64(v) => pb::any_value::Value::IntValue(*v),
797 SpanValue::F64(v) => pb::any_value::Value::DoubleValue(*v),
798 SpanValue::Bool(v) => pb::any_value::Value::BoolValue(*v),
799 SpanValue::Uuid(u) => pb::any_value::Value::StringValue(u.to_string()),
800 };
801 pb::KeyValue {
802 key: attr.key.to_string(),
803 value: Some(pb::AnyValue { value: Some(value) }),
804 }
805 })
806 .collect();
807
808 let events: Vec<pb::OtlpEvent> = self
809 .events
810 .iter()
811 .map(|evt| {
812 let attrs: Vec<pb::KeyValue> = evt
813 .attributes
814 .iter()
815 .map(|a| {
816 let v = match &a.value {
817 SpanValue::String(s) => {
818 pb::any_value::Value::StringValue(s.to_string())
819 }
820 SpanValue::I64(v) => pb::any_value::Value::IntValue(*v),
821 SpanValue::F64(v) => pb::any_value::Value::DoubleValue(*v),
822 SpanValue::Bool(v) => pb::any_value::Value::BoolValue(*v),
823 SpanValue::Uuid(u) => pb::any_value::Value::StringValue(u.to_string()),
824 };
825 pb::KeyValue {
826 key: a.key.to_string(),
827 value: Some(pb::AnyValue { value: Some(v) }),
828 }
829 })
830 .collect();
831 pb::OtlpEvent {
832 time_unix_nano: evt.time_ns,
833 name: evt.name.to_string(),
834 attributes: attrs,
835 dropped_attributes_count: 0,
836 }
837 })
838 .collect();
839
840 pb::OtlpSpan {
841 trace_id: self.trace_id.as_bytes().to_vec(),
842 span_id: self.span_id.as_bytes().to_vec(),
843 parent_span_id: if self.parent_span_id.is_invalid() {
844 Vec::new()
845 } else {
846 self.parent_span_id.as_bytes().to_vec()
847 },
848 name: self.name.to_string(),
849 kind: kind as i32,
850 start_time_unix_nano: self.start_time_ns,
851 end_time_unix_nano: self.end_time_ns,
852 attributes,
853 events,
854 status,
855 ..Default::default()
856 }
857 }
858}
859
860pub fn build_trace_export_request(
864 resource: &pb::Resource,
865 scope_name: &str,
866 spans: Vec<pb::OtlpSpan>,
867) -> pb::ExportTraceServiceRequest {
868 pb::ExportTraceServiceRequest {
869 resource_spans: vec![pb::ResourceSpans {
870 resource: Some(resource.clone()),
871 scope_spans: vec![pb::ScopeSpans {
872 scope: Some(pb::InstrumentationScope {
873 name: scope_name.to_string(),
874 ..Default::default()
875 }),
876 spans,
877 ..Default::default()
878 }],
879 ..Default::default()
880 }],
881 }
882}
883
884#[cfg(test)]
885mod tests {
886 use super::*;
887
888 fn test_timestamp() -> u64 {
889 1_000_000_000 }
891
892 #[test]
893 fn test_counter_otlp() {
894 let counter = Counter::new(4);
895 counter.add(42);
896
897 let mut metrics = Vec::new();
898 counter.export_otlp(
899 &mut metrics,
900 "test_counter",
901 "A test counter",
902 test_timestamp(),
903 );
904
905 assert_eq!(metrics.len(), 1);
906 assert_eq!(metrics[0].name, "test_counter");
907 assert_eq!(metrics[0].description, "A test counter");
908
909 let data = metrics[0].data.as_ref().expect("missing data");
910 match data {
911 pb::metric::Data::Sum(sum) => {
912 assert!(!sum.is_monotonic);
914 assert_eq!(
915 sum.aggregation_temporality,
916 pb::AggregationTemporality::Cumulative as i32
917 );
918 assert_eq!(sum.data_points.len(), 1);
919 assert_eq!(
920 sum.data_points[0].value,
921 Some(pb::number_data_point::Value::AsInt(42))
922 );
923 assert_eq!(sum.data_points[0].time_unix_nano, test_timestamp());
924 }
925 _ => panic!("expected Sum, got {:?}", data),
926 }
927 }
928
929 #[test]
930 fn test_gauge_otlp() {
931 let gauge = Gauge::new();
932 gauge.set(-10);
933
934 let mut metrics = Vec::new();
935 gauge.export_otlp(&mut metrics, "test_gauge", "A test gauge", test_timestamp());
936
937 assert_eq!(metrics.len(), 1);
938 match metrics[0].data.as_ref().expect("missing data") {
939 pb::metric::Data::Gauge(g) => {
940 assert_eq!(g.data_points.len(), 1);
941 assert_eq!(
942 g.data_points[0].value,
943 Some(pb::number_data_point::Value::AsInt(-10))
944 );
945 }
946 other => panic!("expected Gauge, got {:?}", other),
947 }
948 }
949
950 #[test]
951 fn test_gauge_f64_otlp() {
952 let gauge = GaugeF64::new();
953 gauge.set(3.125);
954
955 let mut metrics = Vec::new();
956 gauge.export_otlp(&mut metrics, "test_gauge_f64", "", test_timestamp());
957
958 match metrics[0].data.as_ref().expect("missing data") {
959 pb::metric::Data::Gauge(g) => {
960 assert_eq!(g.data_points.len(), 1);
961 match g.data_points[0].value {
962 Some(pb::number_data_point::Value::AsDouble(v)) => {
963 assert!((v - 3.125).abs() < 1e-10);
964 }
965 ref other => panic!("expected AsDouble, got {:?}", other),
966 }
967 }
968 other => panic!("expected Gauge, got {:?}", other),
969 }
970 }
971
972 #[test]
973 fn test_histogram_otlp() {
974 let h = Histogram::new(&[10, 100], 4);
975 h.record(5);
976 h.record(50);
977 h.record(500);
978
979 let mut metrics = Vec::new();
980 h.export_otlp(
981 &mut metrics,
982 "test_hist",
983 "A test histogram",
984 test_timestamp(),
985 );
986
987 assert_eq!(metrics.len(), 1);
988 match metrics[0].data.as_ref().expect("missing data") {
989 pb::metric::Data::Histogram(hist) => {
990 assert_eq!(
991 hist.aggregation_temporality,
992 pb::AggregationTemporality::Cumulative as i32
993 );
994 assert_eq!(hist.data_points.len(), 1);
995
996 let dp = &hist.data_points[0];
997 assert_eq!(dp.count, 3);
998 assert_eq!(dp.sum, Some(555.0));
999 assert_eq!(dp.explicit_bounds, vec![10.0, 100.0]);
1000 assert_eq!(dp.bucket_counts, vec![1, 1, 1]);
1001 assert_eq!(dp.time_unix_nano, test_timestamp());
1002 }
1003 other => panic!("expected Histogram, got {:?}", other),
1004 }
1005 }
1006
1007 #[test]
1008 fn test_distribution_otlp() {
1009 let dist = Distribution::new(4);
1010 dist.record(100);
1011 dist.record(200);
1012 dist.record(300);
1013
1014 let mut metrics = Vec::new();
1015 dist.export_otlp(
1016 &mut metrics,
1017 "test_dist",
1018 "A distribution",
1019 test_timestamp(),
1020 );
1021
1022 assert_eq!(metrics.len(), 1);
1023 assert_eq!(metrics[0].name, "test_dist");
1024
1025 match metrics[0].data.as_ref().expect("missing data") {
1026 pb::metric::Data::ExponentialHistogram(hist) => {
1027 assert_eq!(
1028 hist.aggregation_temporality,
1029 pb::AggregationTemporality::Cumulative as i32
1030 );
1031 assert_eq!(hist.data_points.len(), 1);
1032
1033 let dp = &hist.data_points[0];
1034 assert_eq!(dp.count, 3);
1035 assert_eq!(dp.sum, Some(600.0));
1036 assert_eq!(dp.scale, 0);
1037 assert_eq!(dp.zero_count, 0);
1038 assert_eq!(dp.time_unix_nano, test_timestamp());
1039 assert!(dp.positive.is_some());
1041 let positive = dp.positive.as_ref().expect("positive buckets");
1042 assert!(!positive.bucket_counts.is_empty());
1044 }
1045 other => panic!("expected ExponentialHistogram, got {:?}", other),
1046 }
1047 }
1048
1049 #[test]
1050 fn test_dynamic_counter_otlp() {
1051 let counter = DynamicCounter::new(4);
1052 counter.add(&[("env", "prod"), ("region", "us")], 10);
1053 counter.add(&[("env", "staging"), ("region", "eu")], 5);
1054
1055 let mut metrics = Vec::new();
1056 counter.export_otlp(&mut metrics, "requests", "Request count", test_timestamp());
1057
1058 assert_eq!(metrics.len(), 1);
1059 match metrics[0].data.as_ref().expect("missing data") {
1060 pb::metric::Data::Sum(sum) => {
1061 assert!(!sum.is_monotonic);
1062 assert_eq!(sum.data_points.len(), 2);
1063 for dp in &sum.data_points {
1064 assert_eq!(dp.attributes.len(), 2);
1065 }
1066 }
1067 other => panic!("expected Sum, got {:?}", other),
1068 }
1069 }
1070
1071 #[test]
1072 fn test_build_export_request() {
1073 let resource = build_resource("test-service", &[("version", "1.0")]);
1074 let counter = Counter::new(4);
1075 counter.add(1);
1076
1077 let mut metrics = Vec::new();
1078 counter.export_otlp(&mut metrics, "my_counter", "", test_timestamp());
1079
1080 let request = build_export_request(&resource, "fast-telemetry", metrics);
1081
1082 assert_eq!(request.resource_metrics.len(), 1);
1083 let rm = &request.resource_metrics[0];
1084 let res = rm.resource.as_ref().expect("missing resource");
1085 assert_eq!(res.attributes.len(), 2); assert_eq!(res.attributes[0].key, "service.name");
1087
1088 assert_eq!(rm.scope_metrics.len(), 1);
1089 let sm = &rm.scope_metrics[0];
1090 let scope = sm.scope.as_ref().expect("missing scope");
1091 assert_eq!(scope.name, "fast-telemetry");
1092 assert_eq!(sm.metrics.len(), 1);
1093 }
1094
1095 #[test]
1096 fn test_make_kv() {
1097 let kv = make_kv("foo", "bar");
1098 assert_eq!(kv.key, "foo");
1099 match kv
1100 .value
1101 .expect("missing value")
1102 .value
1103 .expect("missing inner")
1104 {
1105 pb::any_value::Value::StringValue(s) => assert_eq!(s, "bar"),
1106 other => panic!("expected StringValue, got {:?}", other),
1107 }
1108 }
1109
1110 #[derive(Copy, Clone, Debug)]
1113 enum TestLabel {
1114 A,
1115 B,
1116 C,
1117 }
1118
1119 impl crate::LabelEnum for TestLabel {
1120 const CARDINALITY: usize = 3;
1121 const LABEL_NAME: &'static str = "test";
1122
1123 fn as_index(self) -> usize {
1124 self as usize
1125 }
1126 fn from_index(index: usize) -> Self {
1127 match index {
1128 0 => Self::A,
1129 1 => Self::B,
1130 _ => Self::C,
1131 }
1132 }
1133 fn variant_name(self) -> &'static str {
1134 match self {
1135 Self::A => "a",
1136 Self::B => "b",
1137 Self::C => "c",
1138 }
1139 }
1140 }
1141
1142 #[test]
1143 fn test_labeled_counter_otlp() {
1144 let counter = LabeledCounter::<TestLabel>::new(4);
1145 counter.add(TestLabel::A, 10);
1146 counter.add(TestLabel::B, 20);
1147
1148 let mut metrics = Vec::new();
1149 counter.export_otlp(
1150 &mut metrics,
1151 "labeled_counter",
1152 "By label",
1153 test_timestamp(),
1154 );
1155
1156 assert_eq!(metrics.len(), 1);
1157 match metrics[0].data.as_ref().expect("missing data") {
1158 pb::metric::Data::Sum(sum) => {
1159 assert!(!sum.is_monotonic);
1160 assert_eq!(sum.data_points.len(), 3); let dp_a = sum.data_points.iter().find(|dp| {
1163 dp.attributes.iter().any(|kv| kv.key == "test" && matches!(&kv.value, Some(v) if matches!(&v.value, Some(pb::any_value::Value::StringValue(s)) if s == "a")))
1164 }).expect("missing data point for label A");
1165 assert_eq!(dp_a.value, Some(pb::number_data_point::Value::AsInt(10)));
1166 }
1167 other => panic!("expected Sum, got {:?}", other),
1168 }
1169 }
1170
1171 #[test]
1172 fn test_labeled_gauge_otlp() {
1173 let gauge = LabeledGauge::<TestLabel>::new();
1174 gauge.set(TestLabel::A, 42);
1175 gauge.set(TestLabel::C, -5);
1176
1177 let mut metrics = Vec::new();
1178 gauge.export_otlp(&mut metrics, "labeled_gauge", "By label", test_timestamp());
1179
1180 assert_eq!(metrics.len(), 1);
1181 match metrics[0].data.as_ref().expect("missing data") {
1182 pb::metric::Data::Gauge(g) => {
1183 assert_eq!(g.data_points.len(), 3);
1184 }
1185 other => panic!("expected Gauge, got {:?}", other),
1186 }
1187 }
1188
1189 #[test]
1190 fn test_labeled_histogram_otlp() {
1191 let h = LabeledHistogram::<TestLabel>::new(&[10, 100], 4);
1192 h.record(TestLabel::A, 5);
1193 h.record(TestLabel::A, 50);
1194 h.record(TestLabel::B, 500);
1195
1196 let mut metrics = Vec::new();
1197 h.export_otlp(&mut metrics, "labeled_hist", "By label", test_timestamp());
1198
1199 assert_eq!(metrics.len(), 1);
1200 match metrics[0].data.as_ref().expect("missing data") {
1201 pb::metric::Data::Histogram(hist) => {
1202 assert_eq!(
1203 hist.aggregation_temporality,
1204 pb::AggregationTemporality::Cumulative as i32
1205 );
1206 assert_eq!(hist.data_points.len(), 3); for dp in &hist.data_points {
1209 assert_eq!(dp.attributes.len(), 1);
1210 assert_eq!(dp.attributes[0].key, "test");
1211 assert_eq!(dp.time_unix_nano, test_timestamp());
1212 }
1213 }
1214 other => panic!("expected Histogram, got {:?}", other),
1215 }
1216 }
1217
1218 #[test]
1221 fn test_dynamic_gauge_otlp() {
1222 let gauge = DynamicGauge::new(4);
1223 gauge.set(&[("host", "node1")], 3.125);
1224 gauge.set(&[("host", "node2")], 2.72);
1225
1226 let mut metrics = Vec::new();
1227 gauge.export_otlp(
1228 &mut metrics,
1229 "cpu_usage",
1230 "CPU percentage",
1231 test_timestamp(),
1232 );
1233
1234 assert_eq!(metrics.len(), 1);
1235 match metrics[0].data.as_ref().expect("missing data") {
1236 pb::metric::Data::Gauge(g) => {
1237 assert_eq!(g.data_points.len(), 2);
1238 for dp in &g.data_points {
1239 assert_eq!(dp.attributes.len(), 1);
1240 assert!(matches!(
1241 dp.value,
1242 Some(pb::number_data_point::Value::AsDouble(_))
1243 ));
1244 }
1245 }
1246 other => panic!("expected Gauge, got {:?}", other),
1247 }
1248 }
1249
1250 #[test]
1251 fn test_dynamic_gauge_i64_otlp() {
1252 let gauge = DynamicGaugeI64::new(4);
1253 gauge.set(&[("region", "us")], 100);
1254 gauge.set(&[("region", "eu")], 200);
1255
1256 let mut metrics = Vec::new();
1257 gauge.export_otlp(
1258 &mut metrics,
1259 "connections",
1260 "Active connections",
1261 test_timestamp(),
1262 );
1263
1264 assert_eq!(metrics.len(), 1);
1265 match metrics[0].data.as_ref().expect("missing data") {
1266 pb::metric::Data::Gauge(g) => {
1267 assert_eq!(g.data_points.len(), 2);
1268 for dp in &g.data_points {
1269 assert_eq!(dp.attributes.len(), 1);
1270 assert!(matches!(
1271 dp.value,
1272 Some(pb::number_data_point::Value::AsInt(_))
1273 ));
1274 }
1275 }
1276 other => panic!("expected Gauge, got {:?}", other),
1277 }
1278 }
1279
1280 #[test]
1281 fn test_dynamic_histogram_otlp() {
1282 let h = DynamicHistogram::new(&[10, 100, 1000], 4);
1283 h.record(&[("endpoint", "/api")], 5);
1284 h.record(&[("endpoint", "/api")], 50);
1285 h.record(&[("endpoint", "/health")], 500);
1286
1287 let mut metrics = Vec::new();
1288 h.export_otlp(&mut metrics, "latency", "Request latency", test_timestamp());
1289
1290 assert_eq!(metrics.len(), 1);
1291 match metrics[0].data.as_ref().expect("missing data") {
1292 pb::metric::Data::Histogram(hist) => {
1293 assert_eq!(
1294 hist.aggregation_temporality,
1295 pb::AggregationTemporality::Cumulative as i32
1296 );
1297 assert_eq!(hist.data_points.len(), 2); for dp in &hist.data_points {
1299 assert_eq!(dp.attributes.len(), 1);
1300 assert_eq!(dp.attributes[0].key, "endpoint");
1301 assert_eq!(dp.time_unix_nano, test_timestamp());
1302 assert_eq!(dp.explicit_bounds, vec![10.0, 100.0, 1000.0]);
1304 }
1305 }
1306 other => panic!("expected Histogram, got {:?}", other),
1307 }
1308 }
1309
1310 #[test]
1311 fn test_dynamic_distribution_otlp() {
1312 let dist = DynamicDistribution::new(4);
1313 dist.record(&[("method", "GET")], 100);
1314 dist.record(&[("method", "GET")], 200);
1315 dist.record(&[("method", "POST")], 300);
1316
1317 let mut metrics = Vec::new();
1318 dist.export_otlp(
1319 &mut metrics,
1320 "response_size",
1321 "Size in bytes",
1322 test_timestamp(),
1323 );
1324
1325 assert_eq!(metrics.len(), 1);
1326 assert_eq!(metrics[0].name, "response_size");
1327
1328 match metrics[0].data.as_ref().expect("missing data") {
1329 pb::metric::Data::ExponentialHistogram(hist) => {
1330 assert_eq!(
1331 hist.aggregation_temporality,
1332 pb::AggregationTemporality::Cumulative as i32
1333 );
1334 assert_eq!(hist.data_points.len(), 2); for dp in &hist.data_points {
1336 assert_eq!(dp.attributes.len(), 1);
1337 assert_eq!(dp.attributes[0].key, "method");
1338 assert_eq!(dp.scale, 0);
1339 assert!(dp.positive.is_some());
1340 }
1341 }
1342 other => panic!("expected ExponentialHistogram, got {:?}", other),
1343 }
1344 }
1345
1346 #[test]
1347 fn test_empty_dynamic_metrics_produce_nothing() {
1348 let counter = DynamicCounter::new(4);
1349 let gauge = DynamicGauge::new(4);
1350 let gauge_i64 = DynamicGaugeI64::new(4);
1351 let hist = DynamicHistogram::new(&[10], 4);
1352 let dist = DynamicDistribution::new(4);
1353
1354 let mut metrics = Vec::new();
1355 let ts = test_timestamp();
1356 counter.export_otlp(&mut metrics, "c", "", ts);
1357 gauge.export_otlp(&mut metrics, "g", "", ts);
1358 gauge_i64.export_otlp(&mut metrics, "gi", "", ts);
1359 hist.export_otlp(&mut metrics, "h", "", ts);
1360 dist.export_otlp(&mut metrics, "d", "", ts);
1361
1362 assert!(
1363 metrics.is_empty(),
1364 "empty dynamic metrics should produce no output"
1365 );
1366 }
1367
1368 #[test]
1369 fn test_cumulative_to_otlp_buckets_helper() {
1370 let cumulative = vec![(10, 1), (100, 3), (u64::MAX, 5)];
1373 let (counts, bounds) = cumulative_to_otlp_buckets_iter(cumulative);
1374 assert_eq!(counts, vec![1, 2, 2]);
1375 assert_eq!(bounds, vec![10.0, 100.0]);
1376 }
1377
1378 #[test]
1381 fn test_completed_span_to_otlp() {
1382 use crate::span::{SpanAttribute, SpanEvent, SpanKind, SpanStatus};
1383 use crate::span::{SpanId, TraceId};
1384
1385 let completed = CompletedSpan {
1386 trace_id: TraceId::from_hex("4bf92f3577b34da6a3ce929d0e0e4736").unwrap(),
1387 span_id: SpanId::from_hex("00f067aa0ba902b7").unwrap(),
1388 parent_span_id: SpanId::from_hex("1234567890abcdef").unwrap(),
1389 name: "test_operation".into(),
1390 kind: SpanKind::Server,
1391 start_time_ns: 1_000_000_000,
1392 end_time_ns: 2_000_000_000,
1393 status: SpanStatus::Ok,
1394 attributes: vec![
1395 SpanAttribute::new("http.method", "GET"),
1396 SpanAttribute::new("http.status_code", 200i64),
1397 ],
1398 events: vec![SpanEvent {
1399 name: "processing".into(),
1400 time_ns: 1_500_000_000,
1401 attributes: vec![SpanAttribute::new("step", "validate")],
1402 }],
1403 };
1404
1405 let otlp = completed.to_otlp();
1406
1407 assert_eq!(
1408 otlp.trace_id,
1409 &[
1410 0x4b, 0xf9, 0x2f, 0x35, 0x77, 0xb3, 0x4d, 0xa6, 0xa3, 0xce, 0x92, 0x9d, 0x0e, 0x0e,
1411 0x47, 0x36
1412 ]
1413 );
1414 assert_eq!(
1415 otlp.span_id,
1416 &[0x00, 0xf0, 0x67, 0xaa, 0x0b, 0xa9, 0x02, 0xb7]
1417 );
1418 assert_eq!(
1419 otlp.parent_span_id,
1420 &[0x12, 0x34, 0x56, 0x78, 0x90, 0xab, 0xcd, 0xef]
1421 );
1422 assert_eq!(otlp.name, "test_operation");
1423 assert_eq!(otlp.kind, pb::OtlpSpanKind::Server as i32);
1424 assert_eq!(otlp.start_time_unix_nano, 1_000_000_000);
1425 assert_eq!(otlp.end_time_unix_nano, 2_000_000_000);
1426
1427 let status = otlp.status.unwrap();
1429 assert_eq!(status.code, pb::OtlpStatusCode::Ok as i32);
1430
1431 assert_eq!(otlp.attributes.len(), 2);
1433 assert_eq!(otlp.attributes[0].key, "http.method");
1434 assert_eq!(otlp.attributes[1].key, "http.status_code");
1435
1436 assert_eq!(otlp.events.len(), 1);
1438 assert_eq!(otlp.events[0].name, "processing");
1439 assert_eq!(otlp.events[0].time_unix_nano, 1_500_000_000);
1440 assert_eq!(otlp.events[0].attributes.len(), 1);
1441 }
1442
1443 #[test]
1444 fn test_completed_span_root_has_empty_parent() {
1445 use crate::span::{SpanId, TraceId};
1446
1447 let completed = CompletedSpan {
1448 trace_id: TraceId::random(),
1449 span_id: SpanId::random(),
1450 parent_span_id: SpanId::INVALID,
1451 name: "root".into(),
1452 kind: SpanKind::Server,
1453 start_time_ns: 1_000_000_000,
1454 end_time_ns: 2_000_000_000,
1455 status: SpanStatus::Unset,
1456 attributes: Vec::new(),
1457 events: Vec::new(),
1458 };
1459
1460 let otlp = completed.to_otlp();
1461 assert!(
1462 otlp.parent_span_id.is_empty(),
1463 "root span should have empty parent_span_id"
1464 );
1465 assert!(otlp.status.is_none(), "Unset status should map to None");
1466 }
1467
1468 #[test]
1469 fn test_completed_span_error_status() {
1470 use crate::span::{SpanId, TraceId};
1471
1472 let completed = CompletedSpan {
1473 trace_id: TraceId::random(),
1474 span_id: SpanId::random(),
1475 parent_span_id: SpanId::INVALID,
1476 name: "failing_op".into(),
1477 kind: SpanKind::Internal,
1478 start_time_ns: 1_000_000_000,
1479 end_time_ns: 2_000_000_000,
1480 status: SpanStatus::Error {
1481 message: "connection refused".into(),
1482 },
1483 attributes: Vec::new(),
1484 events: Vec::new(),
1485 };
1486
1487 let otlp = completed.to_otlp();
1488 let status = otlp.status.unwrap();
1489 assert_eq!(status.code, pb::OtlpStatusCode::Error as i32);
1490 assert_eq!(status.message, "connection refused");
1491 }
1492
1493 #[test]
1494 fn test_build_trace_export_request() {
1495 use crate::span::{SpanId, TraceId};
1496
1497 let resource = build_resource("test-service", &[("version", "1.0")]);
1498 let completed = CompletedSpan {
1499 trace_id: TraceId::random(),
1500 span_id: SpanId::random(),
1501 parent_span_id: SpanId::INVALID,
1502 name: "test".into(),
1503 kind: SpanKind::Server,
1504 start_time_ns: 1_000_000_000,
1505 end_time_ns: 2_000_000_000,
1506 status: SpanStatus::Ok,
1507 attributes: Vec::new(),
1508 events: Vec::new(),
1509 };
1510
1511 let otlp_span = completed.to_otlp();
1512 let request = build_trace_export_request(&resource, "fast-telemetry", vec![otlp_span]);
1513
1514 assert_eq!(request.resource_spans.len(), 1);
1515 let rs = &request.resource_spans[0];
1516 let res = rs.resource.as_ref().unwrap();
1517 assert_eq!(res.attributes.len(), 2); assert_eq!(rs.scope_spans.len(), 1);
1520 let ss = &rs.scope_spans[0];
1521 let scope = ss.scope.as_ref().unwrap();
1522 assert_eq!(scope.name, "fast-telemetry");
1523 assert_eq!(ss.spans.len(), 1);
1524 }
1525}