goodmetrics/downstream/
goodmetrics_downstream.rs

1use std::{
2    collections::HashMap,
3    pin::pin,
4    time::{Duration, SystemTime},
5};
6
7use exponential_histogram::ExponentialHistogram;
8use futures::{Stream, StreamExt};
9use tokio::sync::mpsc;
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::metadata::AsciiMetadataValue;
12
13use crate::{
14    aggregation::{
15        AbsorbDistribution, Aggregation, Centroid, Histogram, OnlineTdigest, StatisticSet,
16    },
17    pipeline::{AggregatedMetricsMap, AggregationBatcher, DimensionedMeasurementsMap},
18    proto::{
19        self,
20        goodmetrics::{metrics_client::MetricsClient, Datum, MetricsRequest},
21    },
22    types::{Dimension, Distribution, Measurement, Name, Observation},
23};
24
25use super::{EpochTime, StdError};
26
27/// A downstream that sends metrics to a `goodmetricsd` or other goodmetrics grpc server.
28pub struct GoodmetricsDownstream<TChannel> {
29    client: MetricsClient<TChannel>,
30    header: Option<(&'static str, AsciiMetadataValue)>,
31    shared_dimensions: HashMap<String, proto::goodmetrics::Dimension>,
32}
33
34impl<TChannel> GoodmetricsDownstream<TChannel>
35where
36    TChannel: tonic::client::GrpcService<tonic::body::BoxBody>,
37    TChannel::Error: Into<StdError>,
38    TChannel::ResponseBody: http_body::Body<Data = bytes::Bytes> + Send + 'static,
39    <TChannel::ResponseBody as http_body::Body>::Error: Into<StdError> + Send,
40{
41    /// Create a new goodmetrics sender from a grpc channel
42    pub fn new(
43        client: MetricsClient<TChannel>,
44        header: Option<(&'static str, AsciiMetadataValue)>,
45        shared_dimensions: impl IntoIterator<Item = (impl Into<String>, impl Into<Dimension>)>,
46    ) -> Self {
47        GoodmetricsDownstream {
48            client,
49            header,
50            shared_dimensions: shared_dimensions
51                .into_iter()
52                .map(|(k, v)| (k.into(), v.into().into()))
53                .collect(),
54        }
55    }
56
57    /// Spawn this on a tokio runtime to send your metrics to your downstream receiver
58    pub async fn send_batches_forever(self, receiver: mpsc::Receiver<Vec<Datum>>) {
59        self.send_metrics_stream_forever(ReceiverStream::new(receiver))
60            .await;
61    }
62
63    /// Spawn this on a tokio runtime to send your metrics to your downstream receiver
64    pub async fn send_metrics_stream_forever(
65        mut self,
66        mut receiver: impl Stream<Item = Vec<Datum>> + Unpin,
67    ) {
68        let mut interval = tokio::time::interval(Duration::from_millis(500));
69        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
70        loop {
71            interval.tick().await;
72            // Send as quickly as possible while there are more batches
73            while let Some(batch) = pin!(&mut receiver).next().await {
74                let result = self
75                    .client
76                    .send_metrics(self.request(MetricsRequest {
77                        shared_dimensions: self.shared_dimensions.clone(),
78                        metrics: batch,
79                    }))
80                    .await;
81                match result {
82                    Ok(success) => {
83                        log::debug!("sent metrics: {success:?}");
84                    }
85                    Err(err) => {
86                        if !err.metadata().is_empty() {
87                            log::error!(
88                                "failed to send metrics: {err}. Metadata: {:?}",
89                                err.metadata()
90                            );
91                        }
92                        log::error!("failed to send metrics: {err:?}")
93                    }
94                };
95            }
96        }
97    }
98
99    fn request<T>(&self, request: T) -> tonic::Request<T> {
100        let mut request = tonic::Request::new(request);
101        if let Some((header, value)) = self.header.as_ref() {
102            request.metadata_mut().insert(*header, value.clone());
103        }
104        request
105    }
106}
107
108/// The default mapping from in-memory representation to goodmetrics wire representation
109pub struct GoodmetricsBatcher;
110
111impl AggregationBatcher for GoodmetricsBatcher {
112    type TBatch = Vec<Datum>;
113
114    fn batch_aggregations(
115        &mut self,
116        now: SystemTime,
117        covered_time: Duration,
118        aggregations: &mut AggregatedMetricsMap,
119    ) -> Self::TBatch {
120        aggregations
121            .drain()
122            .flat_map(|(name, dimensioned_measurements)| {
123                as_datums(name, now, covered_time, dimensioned_measurements)
124            })
125            .collect()
126    }
127}
128
129fn as_datums(
130    name: Name,
131    timestamp: SystemTime,
132    _duration: Duration,
133    mut dimensioned_measurements: DimensionedMeasurementsMap,
134) -> Vec<Datum> {
135    dimensioned_measurements
136        .drain()
137        .map(|(dimension_position, mut measurements)| Datum {
138            metric: name.to_string(),
139            unix_nanos: timestamp.nanos_since_epoch(),
140            dimensions: dimension_position
141                .into_iter()
142                .map(|(name, dimension)| (name.into(), dimension.into()))
143                .collect(),
144            measurements: measurements
145                .drain()
146                .map(|(name, aggregation)| (name.into(), aggregation.into()))
147                .collect(),
148        })
149        .collect()
150}
151
152impl From<Dimension> for proto::goodmetrics::Dimension {
153    fn from(value: Dimension) -> Self {
154        Self {
155            value: Some(match value {
156                Dimension::Str(s) => proto::goodmetrics::dimension::Value::String(s.to_string()),
157                Dimension::String(s) => proto::goodmetrics::dimension::Value::String(s),
158                Dimension::Shared(s) => proto::goodmetrics::dimension::Value::String(
159                    // Let's try to avoid cloning if this is the last place the string is shared
160                    std::sync::Arc::<String>::try_unwrap(s).unwrap_or_else(|this| this.to_string()),
161                ),
162                Dimension::Number(n) => proto::goodmetrics::dimension::Value::Number(n),
163                Dimension::Boolean(b) => proto::goodmetrics::dimension::Value::Boolean(b),
164            }),
165        }
166    }
167}
168
169impl From<Aggregation> for proto::goodmetrics::Measurement {
170    fn from(value: Aggregation) -> Self {
171        Self {
172            value: Some(match value {
173                Aggregation::Histogram(buckets) => {
174                    proto::goodmetrics::measurement::Value::Histogram(
175                        proto::goodmetrics::Histogram {
176                            buckets: buckets.into_map(),
177                        },
178                    )
179                }
180                Aggregation::StatisticSet(statistic_set) => {
181                    proto::goodmetrics::measurement::Value::StatisticSet(statistic_set.into())
182                }
183                Aggregation::Sum(sum) => proto::goodmetrics::measurement::Value::I64(sum.sum),
184                Aggregation::TDigest(t_digest) => {
185                    proto::goodmetrics::measurement::Value::Tdigest(t_digest.into())
186                }
187                Aggregation::ExponentialHistogram(histogram) => {
188                    proto::goodmetrics::measurement::Value::Histogram(
189                        proto::goodmetrics::Histogram {
190                            buckets: make_histogram(histogram),
191                        },
192                    )
193                }
194            }),
195        }
196    }
197}
198
199fn make_histogram(eh: ExponentialHistogram) -> HashMap<i64, u64> {
200    HashMap::from_iter(
201        eh.value_counts()
202            .map(|(value, count)| (value.round() as i64, count as u64)),
203    )
204}
205
206impl From<Measurement> for proto::goodmetrics::Measurement {
207    fn from(value: Measurement) -> Self {
208        proto::goodmetrics::Measurement {
209            value: Some(match value {
210                Measurement::Observation(observation) => observation.into(),
211                Measurement::Distribution(distribution) => distribution.into(),
212                Measurement::Sum(sum) => proto::goodmetrics::measurement::Value::I64(sum),
213            }),
214        }
215    }
216}
217
218impl From<Observation> for proto::goodmetrics::measurement::Value {
219    fn from(value: Observation) -> Self {
220        Self::StatisticSet(proto::goodmetrics::StatisticSet {
221            minimum: (&value).into(),
222            maximum: (&value).into(),
223            samplesum: (&value).into(),
224            samplecount: 1,
225        })
226    }
227}
228
229impl From<StatisticSet> for proto::goodmetrics::StatisticSet {
230    fn from(value: StatisticSet) -> Self {
231        Self {
232            minimum: value.min as f64,
233            maximum: value.max as f64,
234            samplesum: value.sum as f64,
235            samplecount: value.count,
236        }
237    }
238}
239
240impl From<Distribution> for proto::goodmetrics::measurement::Value {
241    fn from(value: Distribution) -> Self {
242        let mut histogram = Histogram::default();
243        histogram.absorb(value);
244        Self::Histogram(proto::goodmetrics::Histogram {
245            buckets: histogram.into_map(),
246        })
247    }
248}
249
250impl From<OnlineTdigest> for proto::goodmetrics::TDigest {
251    fn from(mut value: OnlineTdigest) -> Self {
252        let mut v = value.reset_mut();
253        Self {
254            centroids: v.drain_centroids().map(|c| c.into()).collect(),
255            sum: v.sum(),
256            count: v.count() as u64,
257            max: v.max(),
258            min: v.min(),
259        }
260    }
261}
262
263impl From<Centroid> for proto::goodmetrics::t_digest::Centroid {
264    fn from(value: Centroid) -> Self {
265        Self {
266            mean: value.mean(),
267            weight: value.weight() as u64,
268        }
269    }
270}