Skip to main content

zenoh_stats/
keys.rs

1use std::{
2    cell::UnsafeCell,
3    collections::HashMap,
4    fmt, iter,
5    sync::{
6        atomic::{AtomicU64, Ordering},
7        Arc, Mutex, RwLock,
8    },
9};
10
11use prometheus_client::{
12    encoding::{EncodeLabelSet, MetricEncoder, NoLabelSet},
13    metrics::{MetricType, TypedMetric},
14};
15use smallvec::SmallVec;
16use zenoh_keyexpr::{
17    keyexpr,
18    keyexpr_tree::{IKeyExprTree, IKeyExprTreeMut, IKeyExprTreeNode, KeBoxTree},
19};
20
21use crate::{family::TransportMetric, histogram::HistogramBuckets, labels::LabelsSetRef};
22
23#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
24pub struct StatsKeys(SmallVec<[(u64, usize); 1]>);
25
26#[derive(Default)]
27pub struct StatsKeyCache {
28    keys: UnsafeCell<StatsKeys>,
29    generation: AtomicU64,
30    mutex: Mutex<()>,
31}
32
33unsafe impl Send for StatsKeyCache {}
34unsafe impl Sync for StatsKeyCache {}
35
36#[derive(Default)]
37pub struct StatsKeysTree {
38    generation: u64,
39    tree: Option<KeBoxTree<usize>>,
40}
41
42impl StatsKeysTree {
43    /// # Safety
44    ///
45    /// The cache must not be used with another tree.
46    #[inline(always)]
47    pub unsafe fn get_keys<'a>(
48        &self,
49        cache: impl FnOnce() -> Option<&'a StatsKeyCache>,
50        keyexpr: impl FnOnce() -> Option<&'a keyexpr>,
51    ) -> StatsKeys {
52        if self.tree.is_none() {
53            return StatsKeys::default();
54        }
55        if let Some(cache) = cache() {
56            if cache.generation.load(Ordering::Acquire) != self.generation {
57                self.update_cache(cache, keyexpr);
58            }
59            // SAFETY: Dereference the raw pointer.
60            return unsafe { &*cache.keys.get() }.clone();
61        }
62        self.compute_keys(keyexpr)
63    }
64
65    #[cold]
66    fn update_cache<'a>(
67        &self,
68        cache: &StatsKeyCache,
69        keyexpr: impl FnOnce() -> Option<&'a keyexpr>,
70    ) {
71        // Compute the key before locking, in order to shorten the critical section.
72        // The keys may be computed twice, but it matters less than blocking on a lock.
73        let keys = self.compute_keys(keyexpr);
74        let _guard = cache.mutex.lock().unwrap();
75        // Do not override the cache if it has already been set.
76        if cache.generation.load(Ordering::Acquire) == self.generation {
77            return;
78        }
79        unsafe { *cache.keys.get() = keys };
80        cache.generation.store(self.generation, Ordering::Release);
81    }
82
83    #[cold]
84    fn compute_keys<'a>(&self, keyexpr: impl FnOnce() -> Option<&'a keyexpr>) -> StatsKeys {
85        let tree = self.tree.as_ref().unwrap();
86        let Some(keyexpr) = keyexpr() else {
87            return StatsKeys::default();
88        };
89        let keys = tree
90            .intersecting_nodes(keyexpr)
91            .filter_map(|n| n.weight().cloned())
92            .map(|key| (self.generation, key))
93            .collect();
94        StatsKeys(keys)
95    }
96}
97
98#[derive(Debug, Default, Clone)]
99pub(crate) struct StatsKeysRegistry(Arc<RwLock<(Vec<String>, u64)>>);
100
101impl StatsKeysRegistry {
102    pub(crate) fn update_keys<'a>(
103        &self,
104        tree: &mut StatsKeysTree,
105        keyexprs: impl IntoIterator<Item = &'a keyexpr>,
106    ) {
107        let keyexprs = keyexprs.into_iter().collect::<Vec<_>>();
108        let (keys, generation) = &mut *self.0.write().unwrap();
109        if keys.len() == keyexprs.len()
110            && keys.iter().zip(&keyexprs).all(|(k1, k2)| k1 == k2.as_str())
111        {
112            return;
113        }
114        keys.clear();
115        *generation += 1;
116        tree.generation = *generation;
117        tree.tree = None;
118        for (i, keyexpr) in keyexprs.into_iter().enumerate() {
119            keys.insert(i, keyexpr.to_string());
120            tree.tree
121                .get_or_insert_with(Default::default)
122                .insert(keyexpr, i);
123        }
124    }
125
126    pub(crate) fn keys(&self) -> Vec<String> {
127        self.0.read().unwrap().0.clone()
128    }
129}
130
131#[derive(Debug)]
132struct HistogramPerKeyInner {
133    stats_keys: StatsKeysRegistry,
134    buckets: HistogramBuckets,
135    #[allow(clippy::type_complexity)]
136    map: ahash::HashMap<(u64, usize), (u64, Vec<(u64, u64)>)>,
137}
138
139impl HistogramPerKeyInner {
140    fn histogram(&mut self, key: (u64, usize)) -> &mut (u64, Vec<(u64, u64)>) {
141        self.map.entry(key).or_insert_with(|| {
142            let buckets = (self.buckets.0.iter())
143                .chain([&u64::MAX])
144                .map(|b| (*b, 0))
145                .collect();
146            (0, buckets)
147        })
148    }
149}
150
151#[derive(Debug, Clone)]
152pub(crate) struct HistogramPerKey(Arc<Mutex<HistogramPerKeyInner>>);
153
154impl HistogramPerKey {
155    pub(crate) fn new(buckets: HistogramBuckets, stats_keys: StatsKeysRegistry) -> Self {
156        Self(Arc::new(Mutex::new(HistogramPerKeyInner {
157            stats_keys,
158            buckets,
159            map: Default::default(),
160        })))
161    }
162
163    #[inline(always)]
164    pub(crate) fn observe(&self, keys: &StatsKeys, value: u64) {
165        if keys.0.is_empty() {
166            return;
167        }
168        self.observe_cold(keys, value);
169    }
170
171    #[cold]
172    fn observe_cold(&self, keys: &StatsKeys, value: u64) {
173        let inner = &mut *self.0.lock().unwrap();
174        for key in keys.0.iter().copied() {
175            let (sum, buckets) = inner.histogram(key);
176            let (_, count) = buckets.iter_mut().find(|(b, _)| value <= *b).unwrap();
177            *count += 1;
178            *sum += value;
179        }
180    }
181}
182
183impl TypedMetric for HistogramPerKey {
184    const TYPE: MetricType = MetricType::Histogram;
185}
186
187impl TransportMetric for HistogramPerKey {
188    type Collected = HashMap<String, (f64, u64, Vec<(f64, u64)>)>;
189
190    fn drain_into(&self, other: &Self) {
191        let inner = &mut *self.0.lock().unwrap();
192        let other = &mut other.0.lock().unwrap();
193        for (key, (sum, buckets)) in inner.map.drain() {
194            let (other_sum, other_buckets) = other.histogram(key);
195            *other_sum += sum;
196            for ((b, c), (other_b, other_c)) in iter::zip(buckets, other_buckets) {
197                debug_assert_eq!(b, *other_b);
198                *other_c += c;
199            }
200        }
201    }
202
203    fn collect(&self) -> Self::Collected {
204        let inner = &mut *self.0.lock().unwrap();
205        let (keys, generation) = &*inner.stats_keys.0.read().unwrap();
206        let map_histogram = |sum, buckets: &[(u64, u64)]| {
207            (
208                sum as f64,
209                buckets.iter().map(|(_, c)| c).sum(),
210                buckets.iter().map(|(b, c)| (*b as f64, *c)).collect(),
211            )
212        };
213        let mut collected = HashMap::new();
214        inner.map.retain(|(gen, key), (sum, buckets)| {
215            if gen != generation {
216                return false;
217            }
218            collected.insert(keys[*key].clone(), map_histogram(*sum, buckets));
219            true
220        });
221        collected
222    }
223
224    fn sum_collected(collected: &mut Self::Collected, other: &Self::Collected) {
225        for (other_key, other) in other {
226            if let Some((sum, count, buckets)) = collected.get_mut(other_key) {
227                let (other_sum, other_count, other_buckets) = other;
228                *sum += other_sum;
229                *count += other_count;
230                for ((b, c), (other_b, other_c)) in iter::zip(buckets, other_buckets) {
231                    debug_assert_eq!(b, other_b);
232                    *c += other_c;
233                }
234            } else {
235                collected.insert(other_key.clone(), other.clone());
236            }
237        }
238    }
239
240    fn encode(
241        encoder: &mut MetricEncoder,
242        labels: &impl EncodeLabelSet,
243        collected: &Self::Collected,
244    ) -> fmt::Result {
245        for (key, (sum, count, buckets)) in collected {
246            encoder
247                .encode_family(&(LabelsSetRef(labels), &[("key", key)] as &[_]))?
248                .encode_histogram::<NoLabelSet>(*sum, *count, buckets, None)?;
249        }
250        Ok(())
251    }
252}