1use std::collections::BTreeMap;
4
5use crate::{
6 col::Column,
7 gauge::{Gauge, GaugeType},
8 label::Label,
9 multi::{Multi, RawMulti},
10 query::CustomQuery,
11 row::Row,
12 single::{RawSingle, Single},
13};
14
15use opentelemetry::{
16 metrics::{Meter, ObservableGauge},
17 Context, KeyValue, Value,
18};
19
20pub struct Metrics {
22 integer: BTreeMap<String, ObservableGauge<i64>>,
23 float: BTreeMap<String, ObservableGauge<f64>>,
24}
25
26impl Metrics {
27 fn observe_integer(&self, c: &Context, key: &str, val: i64, kv: &[KeyValue]) {
28 match self.integer.get(key) {
29 None => {}
30 Some(og) => og.observe(c, val, kv),
31 }
32 }
33
34 fn observe_float(&self, c: &Context, key: &str, val: f64, kv: &[KeyValue]) {
35 match self.float.get(key) {
36 None => {}
37 Some(og) => og.observe(c, val, kv),
38 }
39 }
40
41 pub fn observe_single(&self, c: &Context, row: &Row, labels: &[Label]) {
43 let m: BTreeMap<String, Value> = row.to_map();
44 let attrs: Vec<KeyValue> = Label::to_attrs(labels, &m);
45
46 let cols: &[Column] = row.as_columns();
47 for col in cols {
48 let key: &str = col.as_name();
49 let val: &Value = col.as_value();
50 match *val {
51 Value::I64(i) => self.observe_integer(c, key, i, &attrs),
52 Value::F64(f) => self.observe_float(c, key, f, &attrs),
53 _ => {}
54 }
55 }
56 }
57
58 pub fn observe(&self, c: &Context, rows: &[Row], labels: &[Label]) {
60 for row in rows {
61 self.observe_single(c, row, labels)
62 }
63 }
64
65 pub fn new(meter: &Meter, gs: &[Gauge]) -> Self {
67 let mut integer: BTreeMap<String, ObservableGauge<i64>> = BTreeMap::new();
68 let mut float: BTreeMap<String, ObservableGauge<f64>> = BTreeMap::new();
69 for g in gs {
70 let name: &str = g.as_name();
71 let typ: GaugeType = g.as_type();
72 match typ {
73 GaugeType::Integer => {
74 let gi = g.to_integer_gauge(meter);
75 integer.insert(name.into(), gi);
76 }
77 GaugeType::Float => {
78 let gf = g.to_float_gauge(meter);
79 float.insert(name.into(), gf);
80 }
81 }
82 }
83 Metrics { integer, float }
84 }
85}
86
87pub struct MetricsCollection {
89 single: Vec<(Single, Metrics)>,
90 multi: Vec<(Multi, Metrics)>,
91}
92
93impl MetricsCollection {
94 fn new_sm(meter: &Meter, vs: Vec<Single>, vm: Vec<Multi>) -> Self {
95 let single: Vec<(Single, Metrics)> = vs
96 .into_iter()
97 .map(|s: Single| {
98 let m = Metrics::new(meter, s.as_gauge());
99 (s, m)
100 })
101 .collect();
102 let multi: Vec<(Multi, Metrics)> = vm
103 .into_iter()
104 .map(|multi: Multi| {
105 let m = Metrics::new(meter, multi.as_gauge());
106 (multi, m)
107 })
108 .collect();
109 Self { single, multi }
110 }
111
112 pub fn new(meter: &Meter, mut q: CustomQuery) -> Self {
114 let rs: Vec<RawSingle> = q.take_single().unwrap_or_default();
115 let rm: Vec<RawMulti> = q.take_multi().unwrap_or_default();
116
117 let vs: Vec<Single> = rs
118 .into_iter()
119 .flat_map(|r: RawSingle| Single::try_from(r).ok())
120 .collect();
121
122 let vm: Vec<Multi> = rm
123 .into_iter()
124 .flat_map(|r: RawMulti| Multi::try_from(r).ok())
125 .collect();
126
127 Self::new_sm(meter, vs, vm)
128 }
129
130 fn observe_single<D, G>(&self, data_source: &mut D, getter: &mut G, c: &Context)
131 where
132 G: FnMut(&mut D, &Single) -> Option<Row>,
133 {
134 for pair in &self.single {
135 let (s, m) = pair;
136 match getter(data_source, s) {
137 None => {}
138 Some(row) => m.observe_single(c, &row, s.as_label()),
139 }
140 }
141 }
142
143 fn observe_multi<D, G>(&self, data_source: &mut D, getter: &mut G, c: &Context)
144 where
145 G: FnMut(&mut D, &Multi) -> Vec<Row>,
146 {
147 for pair in &self.multi {
148 let (multi, m) = pair;
149 let v: Vec<Row> = getter(data_source, multi);
150 let l: &[Label] = multi.as_label();
151 m.observe(c, &v, l);
152 }
153 }
154
155 pub fn observe<D, M, S>(
157 &self,
158 data_source: &mut D,
159 get_single: &mut S,
160 get_multi: &mut M,
161 c: &Context,
162 ) where
163 S: FnMut(&mut D, &Single) -> Option<Row>,
164 M: FnMut(&mut D, &Multi) -> Vec<Row>,
165 {
166 self.observe_single(data_source, get_single, c);
167 self.observe_multi(data_source, get_multi, c);
168 }
169}
170
171pub fn observer_new<D, M, S>(
173 m: MetricsCollection,
174 mut get_single: S,
175 mut get_multi: M,
176 mut shared: D,
177) -> impl FnMut(&Context)
178where
179 S: FnMut(&mut D, &str) -> Option<Row>,
180 M: FnMut(&mut D, &str) -> Vec<Row>,
181{
182 let mut gs = move |d: &mut D, s: &Single| get_single(d, s.as_query());
183
184 let mut gm = move |d: &mut D, m: &Multi| get_multi(d, m.as_query());
185
186 move |c: &Context| m.observe(&mut shared, &mut gs, &mut gm, c)
187}