1use crate::exp_buckets::ExpBucketsSnapshot;
9use crate::{
10 Counter, Distribution, DynamicCounter, DynamicDistribution, DynamicGauge, DynamicGaugeI64,
11 DynamicHistogram, Gauge, GaugeF64, Histogram, LabelEnum, LabeledCounter, LabeledGauge,
12 LabeledHistogram, LabeledSampledTimer, MaxGauge, MaxGaugeF64, MinGauge, MinGaugeF64,
13 SampledTimer,
14};
15use indexmap::IndexMap;
16use klickhouse::{DateTime64, Tz};
17
18const AGGREGATION_TEMPORALITY_CUMULATIVE: i32 = 2;
19
20pub trait ClickHouseExport {
22 fn export_clickhouse(
23 &self,
24 batch: &mut ClickHouseMetricBatch,
25 name: &str,
26 description: &str,
27 time_unix_nano: u64,
28 );
29}
30
31#[derive(Debug)]
33pub struct ClickHouseMetricBatch {
34 resource_attributes: IndexMap<String, String>,
35 service_name: String,
36 scope_name: String,
37 pub sums: Vec<SumRow>,
38 pub gauges: Vec<GaugeRow>,
39 pub histograms: Vec<HistogramRow>,
40 pub exp_histograms: Vec<ExpHistogramRow>,
41}
42
43impl ClickHouseMetricBatch {
44 pub fn new(service_name: impl Into<String>) -> Self {
45 Self::with_scope(service_name, "fast-telemetry")
46 }
47
48 pub fn with_scope(service_name: impl Into<String>, scope_name: impl Into<String>) -> Self {
49 let service_name = service_name.into();
50 let mut resource_attributes = IndexMap::new();
51 resource_attributes.insert("service.name".to_string(), service_name.clone());
52 Self {
53 resource_attributes,
54 service_name,
55 scope_name: scope_name.into(),
56 sums: Vec::new(),
57 gauges: Vec::new(),
58 histograms: Vec::new(),
59 exp_histograms: Vec::new(),
60 }
61 }
62
63 pub fn with_resource_attribute(
64 mut self,
65 key: impl Into<String>,
66 value: impl Into<String>,
67 ) -> Self {
68 self.resource_attributes.insert(key.into(), value.into());
69 self
70 }
71
72 pub fn clear(&mut self) {
73 self.sums.clear();
74 self.gauges.clear();
75 self.histograms.clear();
76 self.exp_histograms.clear();
77 }
78
79 pub fn total_rows(&self) -> usize {
80 self.sums.len() + self.gauges.len() + self.histograms.len() + self.exp_histograms.len()
81 }
82
83 fn push_sum(
84 &mut self,
85 name: &str,
86 description: &str,
87 attrs: IndexMap<String, String>,
88 value: f64,
89 is_monotonic: bool,
90 time_unix_nano: u64,
91 ) {
92 self.sums.push(SumRow {
93 ResourceAttributes: self.resource_attributes.clone(),
94 ResourceSchemaUrl: String::new(),
95 ServiceName: self.service_name.clone(),
96 ScopeName: self.scope_name.clone(),
97 ScopeVersion: String::new(),
98 ScopeAttributes: IndexMap::new(),
99 ScopeDroppedAttrCount: 0,
100 ScopeSchemaUrl: String::new(),
101 MetricName: name.to_string(),
102 MetricDescription: description.to_string(),
103 MetricUnit: String::new(),
104 Attributes: attrs,
105 StartTimeUnix: DateTime64::<9>(Tz::UTC, 0),
106 TimeUnix: DateTime64::<9>(Tz::UTC, time_unix_nano),
107 Value: value,
108 Flags: 0,
109 AggregationTemporality: AGGREGATION_TEMPORALITY_CUMULATIVE,
110 IsMonotonic: is_monotonic,
111 });
112 }
113
114 fn push_gauge(
115 &mut self,
116 name: &str,
117 description: &str,
118 attrs: IndexMap<String, String>,
119 value: f64,
120 time_unix_nano: u64,
121 ) {
122 self.gauges.push(GaugeRow {
123 ResourceAttributes: self.resource_attributes.clone(),
124 ResourceSchemaUrl: String::new(),
125 ServiceName: self.service_name.clone(),
126 ScopeName: self.scope_name.clone(),
127 ScopeVersion: String::new(),
128 ScopeAttributes: IndexMap::new(),
129 ScopeDroppedAttrCount: 0,
130 ScopeSchemaUrl: String::new(),
131 MetricName: name.to_string(),
132 MetricDescription: description.to_string(),
133 MetricUnit: String::new(),
134 Attributes: attrs,
135 StartTimeUnix: DateTime64::<9>(Tz::UTC, 0),
136 TimeUnix: DateTime64::<9>(Tz::UTC, time_unix_nano),
137 Value: value,
138 Flags: 0,
139 });
140 }
141
142 fn push_histogram(
143 &mut self,
144 name: &str,
145 description: &str,
146 attrs: IndexMap<String, String>,
147 histogram: &Histogram,
148 time_unix_nano: u64,
149 ) {
150 let (bucket_counts, explicit_bounds) =
151 cumulative_to_delta_buckets(histogram.buckets_cumulative_iter());
152 self.histograms.push(HistogramRow {
153 ResourceAttributes: self.resource_attributes.clone(),
154 ResourceSchemaUrl: String::new(),
155 ServiceName: self.service_name.clone(),
156 ScopeName: self.scope_name.clone(),
157 ScopeVersion: String::new(),
158 ScopeAttributes: IndexMap::new(),
159 ScopeDroppedAttrCount: 0,
160 ScopeSchemaUrl: String::new(),
161 MetricName: name.to_string(),
162 MetricDescription: description.to_string(),
163 MetricUnit: String::new(),
164 Attributes: attrs,
165 StartTimeUnix: DateTime64::<9>(Tz::UTC, 0),
166 TimeUnix: DateTime64::<9>(Tz::UTC, time_unix_nano),
167 Count: histogram.count(),
168 Sum: histogram.sum() as f64,
169 BucketCounts: bucket_counts,
170 ExplicitBounds: explicit_bounds,
171 Flags: 0,
172 Min: 0.0,
173 Max: 0.0,
174 AggregationTemporality: AGGREGATION_TEMPORALITY_CUMULATIVE,
175 });
176 }
177
178 fn push_histogram_parts<I>(
179 &mut self,
180 name: &str,
181 description: &str,
182 parts: HistogramParts<I>,
183 time_unix_nano: u64,
184 ) where
185 I: IntoIterator<Item = (u64, u64)>,
186 {
187 let (bucket_counts, explicit_bounds) =
188 cumulative_to_delta_buckets(parts.buckets_cumulative);
189 self.histograms.push(HistogramRow {
190 ResourceAttributes: self.resource_attributes.clone(),
191 ResourceSchemaUrl: String::new(),
192 ServiceName: self.service_name.clone(),
193 ScopeName: self.scope_name.clone(),
194 ScopeVersion: String::new(),
195 ScopeAttributes: IndexMap::new(),
196 ScopeDroppedAttrCount: 0,
197 ScopeSchemaUrl: String::new(),
198 MetricName: name.to_string(),
199 MetricDescription: description.to_string(),
200 MetricUnit: String::new(),
201 Attributes: parts.attrs,
202 StartTimeUnix: DateTime64::<9>(Tz::UTC, 0),
203 TimeUnix: DateTime64::<9>(Tz::UTC, time_unix_nano),
204 Count: parts.count,
205 Sum: parts.sum as f64,
206 BucketCounts: bucket_counts,
207 ExplicitBounds: explicit_bounds,
208 Flags: 0,
209 Min: 0.0,
210 Max: 0.0,
211 AggregationTemporality: AGGREGATION_TEMPORALITY_CUMULATIVE,
212 });
213 }
214
215 fn push_distribution(
216 &mut self,
217 name: &str,
218 description: &str,
219 attrs: IndexMap<String, String>,
220 snap: &ExpBucketsSnapshot,
221 time_unix_nano: u64,
222 ) {
223 let (positive_offset, positive_counts) = compact_positive_buckets(snap);
224 self.exp_histograms.push(ExpHistogramRow {
225 ResourceAttributes: self.resource_attributes.clone(),
226 ResourceSchemaUrl: String::new(),
227 ServiceName: self.service_name.clone(),
228 ScopeName: self.scope_name.clone(),
229 ScopeVersion: String::new(),
230 ScopeAttributes: IndexMap::new(),
231 ScopeDroppedAttrCount: 0,
232 ScopeSchemaUrl: String::new(),
233 MetricName: name.to_string(),
234 MetricDescription: description.to_string(),
235 MetricUnit: String::new(),
236 Attributes: attrs,
237 StartTimeUnix: DateTime64::<9>(Tz::UTC, 0),
238 TimeUnix: DateTime64::<9>(Tz::UTC, time_unix_nano),
239 Count: snap.count,
240 Sum: snap.sum as f64,
241 Scale: 0,
242 ZeroCount: snap.zero_count,
243 PositiveOffset: positive_offset,
244 PositiveBucketCounts: positive_counts,
245 NegativeOffset: 0,
246 NegativeBucketCounts: Vec::new(),
247 Flags: 0,
248 Min: snap.min().map(|v| v as f64).unwrap_or(0.0),
249 Max: snap.max().map(|v| v as f64).unwrap_or(0.0),
250 AggregationTemporality: AGGREGATION_TEMPORALITY_CUMULATIVE,
251 });
252 }
253}
254
255struct HistogramParts<I> {
256 attrs: IndexMap<String, String>,
257 buckets_cumulative: I,
258 sum: u64,
259 count: u64,
260}
261
262#[allow(non_snake_case)]
263#[derive(klickhouse::Row, Debug)]
264pub struct SumRow {
265 pub ResourceAttributes: IndexMap<String, String>,
266 pub ResourceSchemaUrl: String,
267 pub ServiceName: String,
268 pub ScopeName: String,
269 pub ScopeVersion: String,
270 pub ScopeAttributes: IndexMap<String, String>,
271 pub ScopeDroppedAttrCount: u32,
272 pub ScopeSchemaUrl: String,
273 pub MetricName: String,
274 pub MetricDescription: String,
275 pub MetricUnit: String,
276 pub Attributes: IndexMap<String, String>,
277 pub StartTimeUnix: DateTime64<9>,
278 pub TimeUnix: DateTime64<9>,
279 pub Value: f64,
280 pub Flags: u32,
281 pub AggregationTemporality: i32,
282 pub IsMonotonic: bool,
283}
284
285#[allow(non_snake_case)]
286#[derive(klickhouse::Row, Debug)]
287pub struct GaugeRow {
288 pub ResourceAttributes: IndexMap<String, String>,
289 pub ResourceSchemaUrl: String,
290 pub ServiceName: String,
291 pub ScopeName: String,
292 pub ScopeVersion: String,
293 pub ScopeAttributes: IndexMap<String, String>,
294 pub ScopeDroppedAttrCount: u32,
295 pub ScopeSchemaUrl: String,
296 pub MetricName: String,
297 pub MetricDescription: String,
298 pub MetricUnit: String,
299 pub Attributes: IndexMap<String, String>,
300 pub StartTimeUnix: DateTime64<9>,
301 pub TimeUnix: DateTime64<9>,
302 pub Value: f64,
303 pub Flags: u32,
304}
305
306#[allow(non_snake_case)]
307#[derive(klickhouse::Row, Debug)]
308pub struct HistogramRow {
309 pub ResourceAttributes: IndexMap<String, String>,
310 pub ResourceSchemaUrl: String,
311 pub ServiceName: String,
312 pub ScopeName: String,
313 pub ScopeVersion: String,
314 pub ScopeAttributes: IndexMap<String, String>,
315 pub ScopeDroppedAttrCount: u32,
316 pub ScopeSchemaUrl: String,
317 pub MetricName: String,
318 pub MetricDescription: String,
319 pub MetricUnit: String,
320 pub Attributes: IndexMap<String, String>,
321 pub StartTimeUnix: DateTime64<9>,
322 pub TimeUnix: DateTime64<9>,
323 pub Count: u64,
324 pub Sum: f64,
325 pub BucketCounts: Vec<u64>,
326 pub ExplicitBounds: Vec<f64>,
327 pub Flags: u32,
328 pub Min: f64,
329 pub Max: f64,
330 pub AggregationTemporality: i32,
331}
332
333#[allow(non_snake_case)]
334#[derive(klickhouse::Row, Debug)]
335pub struct ExpHistogramRow {
336 pub ResourceAttributes: IndexMap<String, String>,
337 pub ResourceSchemaUrl: String,
338 pub ServiceName: String,
339 pub ScopeName: String,
340 pub ScopeVersion: String,
341 pub ScopeAttributes: IndexMap<String, String>,
342 pub ScopeDroppedAttrCount: u32,
343 pub ScopeSchemaUrl: String,
344 pub MetricName: String,
345 pub MetricDescription: String,
346 pub MetricUnit: String,
347 pub Attributes: IndexMap<String, String>,
348 pub StartTimeUnix: DateTime64<9>,
349 pub TimeUnix: DateTime64<9>,
350 pub Count: u64,
351 pub Sum: f64,
352 pub Scale: i32,
353 pub ZeroCount: u64,
354 pub PositiveOffset: i32,
355 pub PositiveBucketCounts: Vec<u64>,
356 pub NegativeOffset: i32,
357 pub NegativeBucketCounts: Vec<u64>,
358 pub Flags: u32,
359 pub Min: f64,
360 pub Max: f64,
361 pub AggregationTemporality: i32,
362}
363
364fn attrs_from_pairs(pairs: &[(String, String)]) -> IndexMap<String, String> {
365 pairs.iter().cloned().collect()
366}
367
368fn single_attr(key: &str, value: &str) -> IndexMap<String, String> {
369 let mut attrs = IndexMap::new();
370 attrs.insert(key.to_string(), value.to_string());
371 attrs
372}
373
374fn cumulative_to_delta_buckets(
375 cumulative: impl IntoIterator<Item = (u64, u64)>,
376) -> (Vec<u64>, Vec<f64>) {
377 let iter = cumulative.into_iter();
378 let (lower, _) = iter.size_hint();
379 let mut bucket_counts = Vec::with_capacity(lower);
380 let mut explicit_bounds = Vec::with_capacity(lower.saturating_sub(1));
381 let mut prev = 0u64;
382
383 for (bound, cumulative_count) in iter {
384 bucket_counts.push(cumulative_count.saturating_sub(prev));
385 prev = cumulative_count;
386 if bound != u64::MAX {
387 explicit_bounds.push(bound as f64);
388 }
389 }
390
391 (bucket_counts, explicit_bounds)
392}
393
394fn compact_positive_buckets(snap: &ExpBucketsSnapshot) -> (i32, Vec<u64>) {
395 let first = snap.positive.iter().position(|&count| count > 0);
396 let last = snap.positive.iter().rposition(|&count| count > 0);
397 match (first, last) {
398 (Some(first), Some(last)) => (first as i32, snap.positive[first..=last].to_vec()),
399 _ => (0, Vec::new()),
400 }
401}
402
403impl ClickHouseExport for Counter {
404 fn export_clickhouse(
405 &self,
406 batch: &mut ClickHouseMetricBatch,
407 name: &str,
408 description: &str,
409 time_unix_nano: u64,
410 ) {
411 batch.push_sum(
412 name,
413 description,
414 IndexMap::new(),
415 self.sum() as f64,
416 false,
417 time_unix_nano,
418 );
419 }
420}
421
422impl ClickHouseExport for Gauge {
423 fn export_clickhouse(
424 &self,
425 batch: &mut ClickHouseMetricBatch,
426 name: &str,
427 description: &str,
428 time_unix_nano: u64,
429 ) {
430 batch.push_gauge(
431 name,
432 description,
433 IndexMap::new(),
434 self.get() as f64,
435 time_unix_nano,
436 );
437 }
438}
439
440impl ClickHouseExport for GaugeF64 {
441 fn export_clickhouse(
442 &self,
443 batch: &mut ClickHouseMetricBatch,
444 name: &str,
445 description: &str,
446 time_unix_nano: u64,
447 ) {
448 batch.push_gauge(
449 name,
450 description,
451 IndexMap::new(),
452 self.get(),
453 time_unix_nano,
454 );
455 }
456}
457
458impl ClickHouseExport for MaxGauge {
459 fn export_clickhouse(
460 &self,
461 batch: &mut ClickHouseMetricBatch,
462 name: &str,
463 description: &str,
464 time_unix_nano: u64,
465 ) {
466 batch.push_gauge(
467 name,
468 description,
469 IndexMap::new(),
470 self.get() as f64,
471 time_unix_nano,
472 );
473 }
474}
475
476impl ClickHouseExport for MaxGaugeF64 {
477 fn export_clickhouse(
478 &self,
479 batch: &mut ClickHouseMetricBatch,
480 name: &str,
481 description: &str,
482 time_unix_nano: u64,
483 ) {
484 batch.push_gauge(
485 name,
486 description,
487 IndexMap::new(),
488 self.get(),
489 time_unix_nano,
490 );
491 }
492}
493
494impl ClickHouseExport for MinGauge {
495 fn export_clickhouse(
496 &self,
497 batch: &mut ClickHouseMetricBatch,
498 name: &str,
499 description: &str,
500 time_unix_nano: u64,
501 ) {
502 batch.push_gauge(
503 name,
504 description,
505 IndexMap::new(),
506 self.get() as f64,
507 time_unix_nano,
508 );
509 }
510}
511
512impl ClickHouseExport for MinGaugeF64 {
513 fn export_clickhouse(
514 &self,
515 batch: &mut ClickHouseMetricBatch,
516 name: &str,
517 description: &str,
518 time_unix_nano: u64,
519 ) {
520 batch.push_gauge(
521 name,
522 description,
523 IndexMap::new(),
524 self.get(),
525 time_unix_nano,
526 );
527 }
528}
529
530impl ClickHouseExport for Histogram {
531 fn export_clickhouse(
532 &self,
533 batch: &mut ClickHouseMetricBatch,
534 name: &str,
535 description: &str,
536 time_unix_nano: u64,
537 ) {
538 batch.push_histogram(name, description, IndexMap::new(), self, time_unix_nano);
539 }
540}
541
542impl ClickHouseExport for Distribution {
543 fn export_clickhouse(
544 &self,
545 batch: &mut ClickHouseMetricBatch,
546 name: &str,
547 description: &str,
548 time_unix_nano: u64,
549 ) {
550 let snap = self.buckets_snapshot();
551 batch.push_distribution(name, description, IndexMap::new(), &snap, time_unix_nano);
552 }
553}
554
555impl ClickHouseExport for SampledTimer {
556 fn export_clickhouse(
557 &self,
558 batch: &mut ClickHouseMetricBatch,
559 name: &str,
560 description: &str,
561 time_unix_nano: u64,
562 ) {
563 let calls_name = format!("{name}_calls");
564 let samples_name = format!("{name}_samples");
565 batch.push_sum(
566 &calls_name,
567 description,
568 IndexMap::new(),
569 self.calls() as f64,
570 false,
571 time_unix_nano,
572 );
573 batch.push_histogram(
574 &samples_name,
575 description,
576 IndexMap::new(),
577 self.histogram(),
578 time_unix_nano,
579 );
580 }
581}
582
583impl ClickHouseExport for DynamicCounter {
584 fn export_clickhouse(
585 &self,
586 batch: &mut ClickHouseMetricBatch,
587 name: &str,
588 description: &str,
589 time_unix_nano: u64,
590 ) {
591 self.visit_series(|labels, value| {
592 batch.push_sum(
593 name,
594 description,
595 attrs_from_pairs(labels),
596 value as f64,
597 false,
598 time_unix_nano,
599 );
600 });
601 }
602}
603
604impl ClickHouseExport for DynamicGauge {
605 fn export_clickhouse(
606 &self,
607 batch: &mut ClickHouseMetricBatch,
608 name: &str,
609 description: &str,
610 time_unix_nano: u64,
611 ) {
612 for (labels, value) in self.snapshot() {
613 batch.push_gauge(
614 name,
615 description,
616 attrs_from_pairs(labels.pairs()),
617 value,
618 time_unix_nano,
619 );
620 }
621 }
622}
623
624impl ClickHouseExport for DynamicGaugeI64 {
625 fn export_clickhouse(
626 &self,
627 batch: &mut ClickHouseMetricBatch,
628 name: &str,
629 description: &str,
630 time_unix_nano: u64,
631 ) {
632 for (labels, value) in self.snapshot() {
633 batch.push_gauge(
634 name,
635 description,
636 attrs_from_pairs(labels.pairs()),
637 value as f64,
638 time_unix_nano,
639 );
640 }
641 }
642}
643
644impl ClickHouseExport for DynamicHistogram {
645 fn export_clickhouse(
646 &self,
647 batch: &mut ClickHouseMetricBatch,
648 name: &str,
649 description: &str,
650 time_unix_nano: u64,
651 ) {
652 self.visit_series(|labels, series| {
653 batch.push_histogram_parts(
654 name,
655 description,
656 HistogramParts {
657 attrs: attrs_from_pairs(labels),
658 buckets_cumulative: series.buckets_cumulative_iter(),
659 sum: series.sum(),
660 count: series.count(),
661 },
662 time_unix_nano,
663 );
664 });
665 }
666}
667
668impl ClickHouseExport for DynamicDistribution {
669 fn export_clickhouse(
670 &self,
671 batch: &mut ClickHouseMetricBatch,
672 name: &str,
673 description: &str,
674 time_unix_nano: u64,
675 ) {
676 self.visit_series(|labels, _count, _sum, snap| {
677 batch.push_distribution(
678 name,
679 description,
680 attrs_from_pairs(labels),
681 &snap,
682 time_unix_nano,
683 );
684 });
685 }
686}
687
688impl<L: LabelEnum> ClickHouseExport for LabeledCounter<L> {
689 fn export_clickhouse(
690 &self,
691 batch: &mut ClickHouseMetricBatch,
692 name: &str,
693 description: &str,
694 time_unix_nano: u64,
695 ) {
696 for (label, value) in self.iter() {
697 batch.push_sum(
698 name,
699 description,
700 single_attr(L::LABEL_NAME, label.variant_name()),
701 value as f64,
702 false,
703 time_unix_nano,
704 );
705 }
706 }
707}
708
709impl<L: LabelEnum> ClickHouseExport for LabeledGauge<L> {
710 fn export_clickhouse(
711 &self,
712 batch: &mut ClickHouseMetricBatch,
713 name: &str,
714 description: &str,
715 time_unix_nano: u64,
716 ) {
717 for (label, value) in self.iter() {
718 batch.push_gauge(
719 name,
720 description,
721 single_attr(L::LABEL_NAME, label.variant_name()),
722 value as f64,
723 time_unix_nano,
724 );
725 }
726 }
727}
728
729impl<L: LabelEnum> ClickHouseExport for LabeledHistogram<L> {
730 fn export_clickhouse(
731 &self,
732 batch: &mut ClickHouseMetricBatch,
733 name: &str,
734 description: &str,
735 time_unix_nano: u64,
736 ) {
737 for (label, histogram) in self.iter() {
738 batch.push_histogram(
739 name,
740 description,
741 single_attr(L::LABEL_NAME, label.variant_name()),
742 histogram,
743 time_unix_nano,
744 );
745 }
746 }
747}
748
749impl<L: LabelEnum> ClickHouseExport for LabeledSampledTimer<L> {
750 fn export_clickhouse(
751 &self,
752 batch: &mut ClickHouseMetricBatch,
753 name: &str,
754 description: &str,
755 time_unix_nano: u64,
756 ) {
757 let calls_name = format!("{name}_calls");
758 let samples_name = format!("{name}_samples");
759 for (label, calls, samples) in self.iter() {
760 let attrs = single_attr(L::LABEL_NAME, label.variant_name());
761 batch.push_sum(
762 &calls_name,
763 description,
764 attrs.clone(),
765 calls.sum() as f64,
766 false,
767 time_unix_nano,
768 );
769 batch.push_histogram(&samples_name, description, attrs, samples, time_unix_nano);
770 }
771 }
772}
773
774#[cfg(test)]
775mod tests {
776 use super::*;
777
778 #[test]
779 fn exports_counter_without_otlp_metric() {
780 let counter = Counter::new(4);
781 counter.add(42);
782 let mut batch = ClickHouseMetricBatch::new("test");
783 counter.export_clickhouse(&mut batch, "requests", "request count", 123);
784
785 assert_eq!(batch.total_rows(), 1);
786 assert_eq!(batch.sums[0].MetricName, "requests");
787 assert_eq!(batch.sums[0].Value, 42.0);
788 }
789
790 #[test]
791 fn exports_distribution_as_exp_histogram() {
792 let distribution = Distribution::new(4);
793 distribution.record(0);
794 distribution.record(10);
795 let mut batch = ClickHouseMetricBatch::new("test");
796 distribution.export_clickhouse(&mut batch, "sizes", "", 123);
797
798 assert_eq!(batch.exp_histograms.len(), 1);
799 assert_eq!(batch.exp_histograms[0].ZeroCount, 1);
800 assert_eq!(batch.exp_histograms[0].Count, 2);
801 assert!(!batch.exp_histograms[0].PositiveBucketCounts.is_empty());
802 }
803}