1use std::{
2 collections::HashMap,
3 pin::pin,
4 time::{Duration, SystemTime},
5};
6
7use exponential_histogram::ExponentialHistogram;
8use futures::{Stream, StreamExt};
9use tokio::sync::mpsc;
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::metadata::AsciiMetadataValue;
12
13use crate::{
14 aggregation::{
15 AbsorbDistribution, Aggregation, Centroid, Histogram, OnlineTdigest, StatisticSet,
16 },
17 pipeline::{AggregatedMetricsMap, AggregationBatcher, DimensionedMeasurementsMap},
18 proto::{
19 self,
20 goodmetrics::{metrics_client::MetricsClient, Datum, MetricsRequest},
21 },
22 types::{Dimension, Distribution, Measurement, Name, Observation},
23};
24
25use super::{EpochTime, StdError};
26
27pub struct GoodmetricsDownstream<TChannel> {
29 client: MetricsClient<TChannel>,
30 header: Option<(&'static str, AsciiMetadataValue)>,
31 shared_dimensions: HashMap<String, proto::goodmetrics::Dimension>,
32}
33
34impl<TChannel> GoodmetricsDownstream<TChannel>
35where
36 TChannel: tonic::client::GrpcService<tonic::body::BoxBody>,
37 TChannel::Error: Into<StdError>,
38 TChannel::ResponseBody: http_body::Body<Data = bytes::Bytes> + Send + 'static,
39 <TChannel::ResponseBody as http_body::Body>::Error: Into<StdError> + Send,
40{
41 pub fn new(
43 client: MetricsClient<TChannel>,
44 header: Option<(&'static str, AsciiMetadataValue)>,
45 shared_dimensions: impl IntoIterator<Item = (impl Into<String>, impl Into<Dimension>)>,
46 ) -> Self {
47 GoodmetricsDownstream {
48 client,
49 header,
50 shared_dimensions: shared_dimensions
51 .into_iter()
52 .map(|(k, v)| (k.into(), v.into().into()))
53 .collect(),
54 }
55 }
56
57 pub async fn send_batches_forever(self, receiver: mpsc::Receiver<Vec<Datum>>) {
59 self.send_metrics_stream_forever(ReceiverStream::new(receiver))
60 .await;
61 }
62
63 pub async fn send_metrics_stream_forever(
65 mut self,
66 mut receiver: impl Stream<Item = Vec<Datum>> + Unpin,
67 ) {
68 let mut interval = tokio::time::interval(Duration::from_millis(500));
69 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
70 loop {
71 interval.tick().await;
72 while let Some(batch) = pin!(&mut receiver).next().await {
74 let result = self
75 .client
76 .send_metrics(self.request(MetricsRequest {
77 shared_dimensions: self.shared_dimensions.clone(),
78 metrics: batch,
79 }))
80 .await;
81 match result {
82 Ok(success) => {
83 log::debug!("sent metrics: {success:?}");
84 }
85 Err(err) => {
86 if !err.metadata().is_empty() {
87 log::error!(
88 "failed to send metrics: {err}. Metadata: {:?}",
89 err.metadata()
90 );
91 }
92 log::error!("failed to send metrics: {err:?}")
93 }
94 };
95 }
96 }
97 }
98
99 fn request<T>(&self, request: T) -> tonic::Request<T> {
100 let mut request = tonic::Request::new(request);
101 if let Some((header, value)) = self.header.as_ref() {
102 request.metadata_mut().insert(*header, value.clone());
103 }
104 request
105 }
106}
107
108pub struct GoodmetricsBatcher;
110
111impl AggregationBatcher for GoodmetricsBatcher {
112 type TBatch = Vec<Datum>;
113
114 fn batch_aggregations(
115 &mut self,
116 now: SystemTime,
117 covered_time: Duration,
118 aggregations: &mut AggregatedMetricsMap,
119 ) -> Self::TBatch {
120 aggregations
121 .drain()
122 .flat_map(|(name, dimensioned_measurements)| {
123 as_datums(name, now, covered_time, dimensioned_measurements)
124 })
125 .collect()
126 }
127}
128
129fn as_datums(
130 name: Name,
131 timestamp: SystemTime,
132 _duration: Duration,
133 mut dimensioned_measurements: DimensionedMeasurementsMap,
134) -> Vec<Datum> {
135 dimensioned_measurements
136 .drain()
137 .map(|(dimension_position, mut measurements)| Datum {
138 metric: name.to_string(),
139 unix_nanos: timestamp.nanos_since_epoch(),
140 dimensions: dimension_position
141 .into_iter()
142 .map(|(name, dimension)| (name.into(), dimension.into()))
143 .collect(),
144 measurements: measurements
145 .drain()
146 .map(|(name, aggregation)| (name.into(), aggregation.into()))
147 .collect(),
148 })
149 .collect()
150}
151
152impl From<Dimension> for proto::goodmetrics::Dimension {
153 fn from(value: Dimension) -> Self {
154 Self {
155 value: Some(match value {
156 Dimension::Str(s) => proto::goodmetrics::dimension::Value::String(s.to_string()),
157 Dimension::String(s) => proto::goodmetrics::dimension::Value::String(s),
158 Dimension::Shared(s) => proto::goodmetrics::dimension::Value::String(
159 std::sync::Arc::<String>::try_unwrap(s).unwrap_or_else(|this| this.to_string()),
161 ),
162 Dimension::Number(n) => proto::goodmetrics::dimension::Value::Number(n),
163 Dimension::Boolean(b) => proto::goodmetrics::dimension::Value::Boolean(b),
164 }),
165 }
166 }
167}
168
169impl From<Aggregation> for proto::goodmetrics::Measurement {
170 fn from(value: Aggregation) -> Self {
171 Self {
172 value: Some(match value {
173 Aggregation::Histogram(buckets) => {
174 proto::goodmetrics::measurement::Value::Histogram(
175 proto::goodmetrics::Histogram {
176 buckets: buckets.into_map(),
177 },
178 )
179 }
180 Aggregation::StatisticSet(statistic_set) => {
181 proto::goodmetrics::measurement::Value::StatisticSet(statistic_set.into())
182 }
183 Aggregation::Sum(sum) => proto::goodmetrics::measurement::Value::I64(sum.sum),
184 Aggregation::TDigest(t_digest) => {
185 proto::goodmetrics::measurement::Value::Tdigest(t_digest.into())
186 }
187 Aggregation::ExponentialHistogram(histogram) => {
188 proto::goodmetrics::measurement::Value::Histogram(
189 proto::goodmetrics::Histogram {
190 buckets: make_histogram(histogram),
191 },
192 )
193 }
194 }),
195 }
196 }
197}
198
199fn make_histogram(eh: ExponentialHistogram) -> HashMap<i64, u64> {
200 HashMap::from_iter(
201 eh.value_counts()
202 .map(|(value, count)| (value.round() as i64, count as u64)),
203 )
204}
205
206impl From<Measurement> for proto::goodmetrics::Measurement {
207 fn from(value: Measurement) -> Self {
208 proto::goodmetrics::Measurement {
209 value: Some(match value {
210 Measurement::Observation(observation) => observation.into(),
211 Measurement::Distribution(distribution) => distribution.into(),
212 Measurement::Sum(sum) => proto::goodmetrics::measurement::Value::I64(sum),
213 }),
214 }
215 }
216}
217
218impl From<Observation> for proto::goodmetrics::measurement::Value {
219 fn from(value: Observation) -> Self {
220 Self::StatisticSet(proto::goodmetrics::StatisticSet {
221 minimum: (&value).into(),
222 maximum: (&value).into(),
223 samplesum: (&value).into(),
224 samplecount: 1,
225 })
226 }
227}
228
229impl From<StatisticSet> for proto::goodmetrics::StatisticSet {
230 fn from(value: StatisticSet) -> Self {
231 Self {
232 minimum: value.min as f64,
233 maximum: value.max as f64,
234 samplesum: value.sum as f64,
235 samplecount: value.count,
236 }
237 }
238}
239
240impl From<Distribution> for proto::goodmetrics::measurement::Value {
241 fn from(value: Distribution) -> Self {
242 let mut histogram = Histogram::default();
243 histogram.absorb(value);
244 Self::Histogram(proto::goodmetrics::Histogram {
245 buckets: histogram.into_map(),
246 })
247 }
248}
249
250impl From<OnlineTdigest> for proto::goodmetrics::TDigest {
251 fn from(mut value: OnlineTdigest) -> Self {
252 let mut v = value.reset_mut();
253 Self {
254 centroids: v.drain_centroids().map(|c| c.into()).collect(),
255 sum: v.sum(),
256 count: v.count() as u64,
257 max: v.max(),
258 min: v.min(),
259 }
260 }
261}
262
263impl From<Centroid> for proto::goodmetrics::t_digest::Centroid {
264 fn from(value: Centroid) -> Self {
265 Self {
266 mean: value.mean(),
267 weight: value.weight() as u64,
268 }
269 }
270}