rs_postgres_stat2otel/
metrics.rs

1//! Creates an observer from a query info.
2
3use std::collections::BTreeMap;
4
5use crate::{
6    col::Column,
7    gauge::{Gauge, GaugeType},
8    label::Label,
9    multi::{Multi, RawMulti},
10    query::CustomQuery,
11    row::Row,
12    single::{RawSingle, Single},
13};
14
15use opentelemetry::{
16    metrics::{Meter, ObservableGauge},
17    Context, KeyValue, Value,
18};
19
20/// Observable Gauges.
21pub struct Metrics {
22    integer: BTreeMap<String, ObservableGauge<i64>>,
23    float: BTreeMap<String, ObservableGauge<f64>>,
24}
25
26impl Metrics {
27    fn observe_integer(&self, c: &Context, key: &str, val: i64, kv: &[KeyValue]) {
28        match self.integer.get(key) {
29            None => {}
30            Some(og) => og.observe(c, val, kv),
31        }
32    }
33
34    fn observe_float(&self, c: &Context, key: &str, val: f64, kv: &[KeyValue]) {
35        match self.float.get(key) {
36            None => {}
37            Some(og) => og.observe(c, val, kv),
38        }
39    }
40
41    /// Observes values got from a row.
42    pub fn observe_single(&self, c: &Context, row: &Row, labels: &[Label]) {
43        let m: BTreeMap<String, Value> = row.to_map();
44        let attrs: Vec<KeyValue> = Label::to_attrs(labels, &m);
45
46        let cols: &[Column] = row.as_columns();
47        for col in cols {
48            let key: &str = col.as_name();
49            let val: &Value = col.as_value();
50            match *val {
51                Value::I64(i) => self.observe_integer(c, key, i, &attrs),
52                Value::F64(f) => self.observe_float(c, key, f, &attrs),
53                _ => {}
54            }
55        }
56    }
57
58    /// Observes values got from rows.
59    pub fn observe(&self, c: &Context, rows: &[Row], labels: &[Label]) {
60        for row in rows {
61            self.observe_single(c, row, labels)
62        }
63    }
64
65    /// Creates new observable gauges.
66    pub fn new(meter: &Meter, gs: &[Gauge]) -> Self {
67        let mut integer: BTreeMap<String, ObservableGauge<i64>> = BTreeMap::new();
68        let mut float: BTreeMap<String, ObservableGauge<f64>> = BTreeMap::new();
69        for g in gs {
70            let name: &str = g.as_name();
71            let typ: GaugeType = g.as_type();
72            match typ {
73                GaugeType::Integer => {
74                    let gi = g.to_integer_gauge(meter);
75                    integer.insert(name.into(), gi);
76                }
77                GaugeType::Float => {
78                    let gf = g.to_float_gauge(meter);
79                    float.insert(name.into(), gf);
80                }
81            }
82        }
83        Metrics { integer, float }
84    }
85}
86
87/// A pair of observable gauges(single, multi)
88pub struct MetricsCollection {
89    single: Vec<(Single, Metrics)>,
90    multi: Vec<(Multi, Metrics)>,
91}
92
93impl MetricsCollection {
94    fn new_sm(meter: &Meter, vs: Vec<Single>, vm: Vec<Multi>) -> Self {
95        let single: Vec<(Single, Metrics)> = vs
96            .into_iter()
97            .map(|s: Single| {
98                let m = Metrics::new(meter, s.as_gauge());
99                (s, m)
100            })
101            .collect();
102        let multi: Vec<(Multi, Metrics)> = vm
103            .into_iter()
104            .map(|multi: Multi| {
105                let m = Metrics::new(meter, multi.as_gauge());
106                (multi, m)
107            })
108            .collect();
109        Self { single, multi }
110    }
111
112    /// Creates new collection using a query info.
113    pub fn new(meter: &Meter, mut q: CustomQuery) -> Self {
114        let rs: Vec<RawSingle> = q.take_single().unwrap_or_default();
115        let rm: Vec<RawMulti> = q.take_multi().unwrap_or_default();
116
117        let vs: Vec<Single> = rs
118            .into_iter()
119            .flat_map(|r: RawSingle| Single::try_from(r).ok())
120            .collect();
121
122        let vm: Vec<Multi> = rm
123            .into_iter()
124            .flat_map(|r: RawMulti| Multi::try_from(r).ok())
125            .collect();
126
127        Self::new_sm(meter, vs, vm)
128    }
129
130    fn observe_single<D, G>(&self, data_source: &mut D, getter: &mut G, c: &Context)
131    where
132        G: FnMut(&mut D, &Single) -> Option<Row>,
133    {
134        for pair in &self.single {
135            let (s, m) = pair;
136            match getter(data_source, s) {
137                None => {}
138                Some(row) => m.observe_single(c, &row, s.as_label()),
139            }
140        }
141    }
142
143    fn observe_multi<D, G>(&self, data_source: &mut D, getter: &mut G, c: &Context)
144    where
145        G: FnMut(&mut D, &Multi) -> Vec<Row>,
146    {
147        for pair in &self.multi {
148            let (multi, m) = pair;
149            let v: Vec<Row> = getter(data_source, multi);
150            let l: &[Label] = multi.as_label();
151            m.observe(c, &v, l);
152        }
153    }
154
155    /// Observes all gauges.
156    pub fn observe<D, M, S>(
157        &self,
158        data_source: &mut D,
159        get_single: &mut S,
160        get_multi: &mut M,
161        c: &Context,
162    ) where
163        S: FnMut(&mut D, &Single) -> Option<Row>,
164        M: FnMut(&mut D, &Multi) -> Vec<Row>,
165    {
166        self.observe_single(data_source, get_single, c);
167        self.observe_multi(data_source, get_multi, c);
168    }
169}
170
171/// Creates new observer.
172pub fn observer_new<D, M, S>(
173    m: MetricsCollection,
174    mut get_single: S,
175    mut get_multi: M,
176    mut shared: D,
177) -> impl FnMut(&Context)
178where
179    S: FnMut(&mut D, &str) -> Option<Row>,
180    M: FnMut(&mut D, &str) -> Vec<Row>,
181{
182    let mut gs = move |d: &mut D, s: &Single| get_single(d, s.as_query());
183
184    let mut gm = move |d: &mut D, m: &Multi| get_multi(d, m.as_query());
185
186    move |c: &Context| m.observe(&mut shared, &mut gs, &mut gm, c)
187}