1use std::{
2 cell::UnsafeCell,
3 collections::HashMap,
4 fmt, iter,
5 sync::{
6 atomic::{AtomicU64, Ordering},
7 Arc, Mutex, RwLock,
8 },
9};
10
11use prometheus_client::{
12 encoding::{EncodeLabelSet, MetricEncoder, NoLabelSet},
13 metrics::{MetricType, TypedMetric},
14};
15use smallvec::SmallVec;
16use zenoh_keyexpr::{
17 keyexpr,
18 keyexpr_tree::{IKeyExprTree, IKeyExprTreeMut, IKeyExprTreeNode, KeBoxTree},
19};
20
21use crate::{family::TransportMetric, histogram::HistogramBuckets, labels::LabelsSetRef};
22
23#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
24pub struct StatsKeys(SmallVec<[(u64, usize); 1]>);
25
26#[derive(Default)]
27pub struct StatsKeyCache {
28 keys: UnsafeCell<StatsKeys>,
29 generation: AtomicU64,
30 mutex: Mutex<()>,
31}
32
33unsafe impl Send for StatsKeyCache {}
34unsafe impl Sync for StatsKeyCache {}
35
36#[derive(Default)]
37pub struct StatsKeysTree {
38 generation: u64,
39 tree: Option<KeBoxTree<usize>>,
40}
41
42impl StatsKeysTree {
43 #[inline(always)]
47 pub unsafe fn get_keys<'a>(
48 &self,
49 cache: impl FnOnce() -> Option<&'a StatsKeyCache>,
50 keyexpr: impl FnOnce() -> Option<&'a keyexpr>,
51 ) -> StatsKeys {
52 if self.tree.is_none() {
53 return StatsKeys::default();
54 }
55 if let Some(cache) = cache() {
56 if cache.generation.load(Ordering::Acquire) != self.generation {
57 self.update_cache(cache, keyexpr);
58 }
59 return unsafe { &*cache.keys.get() }.clone();
61 }
62 self.compute_keys(keyexpr)
63 }
64
65 #[cold]
66 fn update_cache<'a>(
67 &self,
68 cache: &StatsKeyCache,
69 keyexpr: impl FnOnce() -> Option<&'a keyexpr>,
70 ) {
71 let keys = self.compute_keys(keyexpr);
74 let _guard = cache.mutex.lock().unwrap();
75 if cache.generation.load(Ordering::Acquire) == self.generation {
77 return;
78 }
79 unsafe { *cache.keys.get() = keys };
80 cache.generation.store(self.generation, Ordering::Release);
81 }
82
83 #[cold]
84 fn compute_keys<'a>(&self, keyexpr: impl FnOnce() -> Option<&'a keyexpr>) -> StatsKeys {
85 let tree = self.tree.as_ref().unwrap();
86 let Some(keyexpr) = keyexpr() else {
87 return StatsKeys::default();
88 };
89 let keys = tree
90 .intersecting_nodes(keyexpr)
91 .filter_map(|n| n.weight().cloned())
92 .map(|key| (self.generation, key))
93 .collect();
94 StatsKeys(keys)
95 }
96}
97
98#[derive(Debug, Default, Clone)]
99pub(crate) struct StatsKeysRegistry(Arc<RwLock<(Vec<String>, u64)>>);
100
101impl StatsKeysRegistry {
102 pub(crate) fn update_keys<'a>(
103 &self,
104 tree: &mut StatsKeysTree,
105 keyexprs: impl IntoIterator<Item = &'a keyexpr>,
106 ) {
107 let keyexprs = keyexprs.into_iter().collect::<Vec<_>>();
108 let (keys, generation) = &mut *self.0.write().unwrap();
109 if keys.len() == keyexprs.len()
110 && keys.iter().zip(&keyexprs).all(|(k1, k2)| k1 == k2.as_str())
111 {
112 return;
113 }
114 keys.clear();
115 *generation += 1;
116 tree.generation = *generation;
117 tree.tree = None;
118 for (i, keyexpr) in keyexprs.into_iter().enumerate() {
119 keys.insert(i, keyexpr.to_string());
120 tree.tree
121 .get_or_insert_with(Default::default)
122 .insert(keyexpr, i);
123 }
124 }
125
126 pub(crate) fn keys(&self) -> Vec<String> {
127 self.0.read().unwrap().0.clone()
128 }
129}
130
131#[derive(Debug)]
132struct HistogramPerKeyInner {
133 stats_keys: StatsKeysRegistry,
134 buckets: HistogramBuckets,
135 #[allow(clippy::type_complexity)]
136 map: ahash::HashMap<(u64, usize), (u64, Vec<(u64, u64)>)>,
137}
138
139impl HistogramPerKeyInner {
140 fn histogram(&mut self, key: (u64, usize)) -> &mut (u64, Vec<(u64, u64)>) {
141 self.map.entry(key).or_insert_with(|| {
142 let buckets = (self.buckets.0.iter())
143 .chain([&u64::MAX])
144 .map(|b| (*b, 0))
145 .collect();
146 (0, buckets)
147 })
148 }
149}
150
151#[derive(Debug, Clone)]
152pub(crate) struct HistogramPerKey(Arc<Mutex<HistogramPerKeyInner>>);
153
154impl HistogramPerKey {
155 pub(crate) fn new(buckets: HistogramBuckets, stats_keys: StatsKeysRegistry) -> Self {
156 Self(Arc::new(Mutex::new(HistogramPerKeyInner {
157 stats_keys,
158 buckets,
159 map: Default::default(),
160 })))
161 }
162
163 #[inline(always)]
164 pub(crate) fn observe(&self, keys: &StatsKeys, value: u64) {
165 if keys.0.is_empty() {
166 return;
167 }
168 self.observe_cold(keys, value);
169 }
170
171 #[cold]
172 fn observe_cold(&self, keys: &StatsKeys, value: u64) {
173 let inner = &mut *self.0.lock().unwrap();
174 for key in keys.0.iter().copied() {
175 let (sum, buckets) = inner.histogram(key);
176 let (_, count) = buckets.iter_mut().find(|(b, _)| value <= *b).unwrap();
177 *count += 1;
178 *sum += value;
179 }
180 }
181}
182
183impl TypedMetric for HistogramPerKey {
184 const TYPE: MetricType = MetricType::Histogram;
185}
186
187impl TransportMetric for HistogramPerKey {
188 type Collected = HashMap<String, (f64, u64, Vec<(f64, u64)>)>;
189
190 fn drain_into(&self, other: &Self) {
191 let inner = &mut *self.0.lock().unwrap();
192 let other = &mut other.0.lock().unwrap();
193 for (key, (sum, buckets)) in inner.map.drain() {
194 let (other_sum, other_buckets) = other.histogram(key);
195 *other_sum += sum;
196 for ((b, c), (other_b, other_c)) in iter::zip(buckets, other_buckets) {
197 debug_assert_eq!(b, *other_b);
198 *other_c += c;
199 }
200 }
201 }
202
203 fn collect(&self) -> Self::Collected {
204 let inner = &mut *self.0.lock().unwrap();
205 let (keys, generation) = &*inner.stats_keys.0.read().unwrap();
206 let map_histogram = |sum, buckets: &[(u64, u64)]| {
207 (
208 sum as f64,
209 buckets.iter().map(|(_, c)| c).sum(),
210 buckets.iter().map(|(b, c)| (*b as f64, *c)).collect(),
211 )
212 };
213 let mut collected = HashMap::new();
214 inner.map.retain(|(gen, key), (sum, buckets)| {
215 if gen != generation {
216 return false;
217 }
218 collected.insert(keys[*key].clone(), map_histogram(*sum, buckets));
219 true
220 });
221 collected
222 }
223
224 fn sum_collected(collected: &mut Self::Collected, other: &Self::Collected) {
225 for (other_key, other) in other {
226 if let Some((sum, count, buckets)) = collected.get_mut(other_key) {
227 let (other_sum, other_count, other_buckets) = other;
228 *sum += other_sum;
229 *count += other_count;
230 for ((b, c), (other_b, other_c)) in iter::zip(buckets, other_buckets) {
231 debug_assert_eq!(b, other_b);
232 *c += other_c;
233 }
234 } else {
235 collected.insert(other_key.clone(), other.clone());
236 }
237 }
238 }
239
240 fn encode(
241 encoder: &mut MetricEncoder,
242 labels: &impl EncodeLabelSet,
243 collected: &Self::Collected,
244 ) -> fmt::Result {
245 for (key, (sum, count, buckets)) in collected {
246 encoder
247 .encode_family(&(LabelsSetRef(labels), &[("key", key)] as &[_]))?
248 .encode_histogram::<NoLabelSet>(*sum, *count, buckets, None)?;
249 }
250 Ok(())
251 }
252}