Skip to main content

nexo_pairing/
telemetry.rs

1//! Prometheus telemetry for the pairing flow.
2//!
3//! Counters and one push-tracked gauge:
4//!
5//! - `pairing_approvals_total{channel,result}` — counter
6//! - `pairing_codes_expired_total` — counter
7//! - `pairing_bootstrap_tokens_issued_total{profile}` — counter
8//! - `pairing_requests_pending{channel}` — gauge
9//!
10//! Layering note: this crate is a leaf (no `nexo-core` dep) so the
11//! metric primitives ship here and `nexo_core::telemetry::render_prometheus`
12//! stitches the [`render`] block into the exposition response. Same
13//! pattern as `nexo_web_search::telemetry`.
14//!
15//! The gauge is push-tracked: `inc/dec` from each lifecycle event and
16//! [`set`] for an authoritative refresh. Drift after a daemon crash is
17//! recovered by calling [`crate::refresh_pending_gauge`] against the
18//! store.
19
20use dashmap::DashMap;
21use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
22use std::sync::LazyLock;
23
24#[derive(Clone, Debug, Hash, Eq, PartialEq)]
25struct ApprovalKey {
26    channel: String,
27    result: String,
28}
29
30static APPROVALS: LazyLock<DashMap<ApprovalKey, AtomicU64>> = LazyLock::new(DashMap::new);
31static CODES_EXPIRED: AtomicU64 = AtomicU64::new(0);
32static BOOTSTRAP_TOKENS_ISSUED: LazyLock<DashMap<String, AtomicU64>> = LazyLock::new(DashMap::new);
33static REQUESTS_PENDING: LazyLock<DashMap<String, AtomicI64>> = LazyLock::new(DashMap::new);
34
35/// Bump the pairing-approval counter. `result` is one of
36/// `"ok" | "expired" | "not_found"`. `channel` is the empty string for
37/// `not_found` (the row was never located so its channel is unknown).
38pub fn inc_approvals(channel: &str, result: &str) {
39    APPROVALS
40        .entry(ApprovalKey {
41            channel: channel.to_string(),
42            result: result.to_string(),
43        })
44        .or_insert_with(|| AtomicU64::new(0))
45        .fetch_add(1, Ordering::Relaxed);
46}
47
48/// Add `n` expirations to the pruned-codes counter. Bumped from two
49/// sites: per row deleted by [`crate::PairingStore::purge_expired`] and
50/// once per `approve` call that finds a row past TTL.
51pub fn add_codes_expired(n: u64) {
52    CODES_EXPIRED.fetch_add(n, Ordering::Relaxed);
53}
54
55/// Bump the bootstrap-token issuance counter. `profile` reflects the
56/// claims profile that the token will carry.
57pub fn inc_bootstrap_tokens_issued(profile: &str) {
58    BOOTSTRAP_TOKENS_ISSUED
59        .entry(profile.to_string())
60        .or_insert_with(|| AtomicU64::new(0))
61        .fetch_add(1, Ordering::Relaxed);
62}
63
64/// Increment the pending-requests gauge for `channel`.
65pub fn inc_requests_pending(channel: &str) {
66    REQUESTS_PENDING
67        .entry(channel.to_string())
68        .or_insert_with(|| AtomicI64::new(0))
69        .fetch_add(1, Ordering::Relaxed);
70}
71
72/// Decrement the pending-requests gauge for `channel`. Clamps at 0 —
73/// going negative would mean the bookkeeping drifted (a daemon crash
74/// between insert and `+1`, or a double `dec`). We stay at 0 and
75/// rely on [`crate::refresh_pending_gauge`] for authoritative recovery
76/// rather than trusting an underflow.
77pub fn dec_requests_pending(channel: &str) {
78    let entry = REQUESTS_PENDING
79        .entry(channel.to_string())
80        .or_insert_with(|| AtomicI64::new(0));
81    let mut current = entry.load(Ordering::Relaxed);
82    loop {
83        if current <= 0 {
84            entry.store(0, Ordering::Relaxed);
85            return;
86        }
87        match entry.compare_exchange_weak(
88            current,
89            current - 1,
90            Ordering::Relaxed,
91            Ordering::Relaxed,
92        ) {
93            Ok(_) => return,
94            Err(actual) => current = actual,
95        }
96    }
97}
98
99/// Subtract `n` from the pending-requests gauge for `channel` (clamped
100/// at 0). Used by `purge_expired` to batch a per-channel decrement.
101pub fn sub_requests_pending(channel: &str, n: i64) {
102    if n <= 0 {
103        return;
104    }
105    let entry = REQUESTS_PENDING
106        .entry(channel.to_string())
107        .or_insert_with(|| AtomicI64::new(0));
108    let mut current = entry.load(Ordering::Relaxed);
109    loop {
110        let next = (current - n).max(0);
111        match entry.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed) {
112            Ok(_) => return,
113            Err(actual) => current = actual,
114        }
115    }
116}
117
118/// Authoritative set of the pending-requests gauge for `channel`.
119/// Called by [`crate::refresh_pending_gauge`] from a `SELECT COUNT(*)`
120/// result so a process restart resyncs without relying on
121/// inc/dec bookkeeping.
122pub fn set_requests_pending(channel: &str, value: i64) {
123    REQUESTS_PENDING
124        .entry(channel.to_string())
125        .or_insert_with(|| AtomicI64::new(0))
126        .store(value.max(0), Ordering::Relaxed);
127}
128
129/// Snapshot of every channel currently tracked by the gauge. Used by
130/// the refresh routine to zero out channels that no longer have any
131/// pending rows.
132pub fn pending_channels() -> Vec<String> {
133    REQUESTS_PENDING.iter().map(|e| e.key().clone()).collect()
134}
135
136// === Test helpers — read counter values. ===
137
138pub fn approvals_total(channel: &str, result: &str) -> u64 {
139    APPROVALS
140        .get(&ApprovalKey {
141            channel: channel.to_string(),
142            result: result.to_string(),
143        })
144        .map(|v| v.value().load(Ordering::Relaxed))
145        .unwrap_or(0)
146}
147
148pub fn codes_expired_total() -> u64 {
149    CODES_EXPIRED.load(Ordering::Relaxed)
150}
151
152pub fn bootstrap_tokens_issued_total(profile: &str) -> u64 {
153    BOOTSTRAP_TOKENS_ISSUED
154        .get(profile)
155        .map(|v| v.value().load(Ordering::Relaxed))
156        .unwrap_or(0)
157}
158
159pub fn requests_pending(channel: &str) -> i64 {
160    REQUESTS_PENDING
161        .get(channel)
162        .map(|v| v.value().load(Ordering::Relaxed))
163        .unwrap_or(0)
164}
165
166/// Reset every 26.y counter — test-only. The 26.x counter
167/// (`pairing_inbound_challenged_total`) lives in `nexo-core` and is
168/// not touched here.
169pub fn reset_for_test() {
170    APPROVALS.clear();
171    CODES_EXPIRED.store(0, Ordering::Relaxed);
172    BOOTSTRAP_TOKENS_ISSUED.clear();
173    REQUESTS_PENDING.clear();
174}
175
176fn escape(s: &str) -> String {
177    let mut out = String::with_capacity(s.len());
178    for c in s.chars() {
179        match c {
180            '\\' => out.push_str("\\\\"),
181            '"' => out.push_str("\\\""),
182            '\n' => out.push_str("\\n"),
183            _ => out.push(c),
184        }
185    }
186    out
187}
188
189/// Append the pairing 26.y metrics block to a Prometheus exposition
190/// buffer. Called by `nexo_core::telemetry::render_prometheus`.
191pub fn render(out: &mut String) {
192    out.push_str(
193        "# HELP pairing_approvals_total Pairing approval attempts by channel and result.\n",
194    );
195    out.push_str("# TYPE pairing_approvals_total counter\n");
196    if APPROVALS.is_empty() {
197        out.push_str("pairing_approvals_total{channel=\"\",result=\"\"} 0\n");
198    } else {
199        let mut rows: Vec<(ApprovalKey, u64)> = APPROVALS
200            .iter()
201            .map(|e| (e.key().clone(), e.value().load(Ordering::Relaxed)))
202            .collect();
203        rows.sort_by(|a, b| {
204            (a.0.channel.clone(), a.0.result.clone())
205                .cmp(&(b.0.channel.clone(), b.0.result.clone()))
206        });
207        for (k, v) in rows {
208            out.push_str(&format!(
209                "pairing_approvals_total{{channel=\"{}\",result=\"{}\"}} {}\n",
210                escape(&k.channel),
211                escape(&k.result),
212                v
213            ));
214        }
215    }
216
217    out.push_str("# HELP pairing_codes_expired_total Pairing setup codes pruned past TTL or rejected as expired on approve.\n");
218    out.push_str("# TYPE pairing_codes_expired_total counter\n");
219    out.push_str(&format!(
220        "pairing_codes_expired_total {}\n",
221        CODES_EXPIRED.load(Ordering::Relaxed)
222    ));
223
224    out.push_str(
225        "# HELP pairing_bootstrap_tokens_issued_total Bootstrap tokens minted by profile.\n",
226    );
227    out.push_str("# TYPE pairing_bootstrap_tokens_issued_total counter\n");
228    if BOOTSTRAP_TOKENS_ISSUED.is_empty() {
229        out.push_str("pairing_bootstrap_tokens_issued_total{profile=\"\"} 0\n");
230    } else {
231        let mut rows: Vec<(String, u64)> = BOOTSTRAP_TOKENS_ISSUED
232            .iter()
233            .map(|e| (e.key().clone(), e.value().load(Ordering::Relaxed)))
234            .collect();
235        rows.sort_by(|a, b| a.0.cmp(&b.0));
236        for (profile, v) in rows {
237            out.push_str(&format!(
238                "pairing_bootstrap_tokens_issued_total{{profile=\"{}\"}} {}\n",
239                escape(&profile),
240                v
241            ));
242        }
243    }
244
245    out.push_str(
246        "# HELP pairing_requests_pending Pending pairing requests by channel (push-tracked).\n",
247    );
248    out.push_str("# TYPE pairing_requests_pending gauge\n");
249    if REQUESTS_PENDING.is_empty() {
250        out.push_str("pairing_requests_pending{channel=\"\"} 0\n");
251    } else {
252        let mut rows: Vec<(String, i64)> = REQUESTS_PENDING
253            .iter()
254            .map(|e| (e.key().clone(), e.value().load(Ordering::Relaxed)))
255            .collect();
256        rows.sort_by(|a, b| a.0.cmp(&b.0));
257        for (channel, v) in rows {
258            out.push_str(&format!(
259                "pairing_requests_pending{{channel=\"{}\"}} {}\n",
260                escape(&channel),
261                v
262            ));
263        }
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use std::sync::Mutex;
271
272    static TEST_LOCK: Mutex<()> = Mutex::new(());
273
274    #[test]
275    fn approvals_inc_and_read() {
276        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
277        reset_for_test();
278        inc_approvals("whatsapp", "ok");
279        inc_approvals("whatsapp", "ok");
280        inc_approvals("telegram", "expired");
281        assert_eq!(approvals_total("whatsapp", "ok"), 2);
282        assert_eq!(approvals_total("telegram", "expired"), 1);
283        assert_eq!(approvals_total("whatsapp", "expired"), 0);
284    }
285
286    #[test]
287    fn codes_expired_accumulates() {
288        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
289        reset_for_test();
290        add_codes_expired(3);
291        add_codes_expired(2);
292        assert_eq!(codes_expired_total(), 5);
293    }
294
295    #[test]
296    fn bootstrap_tokens_per_profile() {
297        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
298        reset_for_test();
299        inc_bootstrap_tokens_issued("default");
300        inc_bootstrap_tokens_issued("staging");
301        inc_bootstrap_tokens_issued("default");
302        assert_eq!(bootstrap_tokens_issued_total("default"), 2);
303        assert_eq!(bootstrap_tokens_issued_total("staging"), 1);
304    }
305
306    #[test]
307    fn requests_pending_inc_dec_clamp() {
308        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
309        reset_for_test();
310        inc_requests_pending("whatsapp");
311        inc_requests_pending("whatsapp");
312        assert_eq!(requests_pending("whatsapp"), 2);
313        dec_requests_pending("whatsapp");
314        assert_eq!(requests_pending("whatsapp"), 1);
315        dec_requests_pending("whatsapp");
316        dec_requests_pending("whatsapp"); // would underflow
317        assert_eq!(requests_pending("whatsapp"), 0);
318    }
319
320    #[test]
321    fn requests_pending_sub_clamp() {
322        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
323        reset_for_test();
324        for _ in 0..5 {
325            inc_requests_pending("telegram");
326        }
327        sub_requests_pending("telegram", 3);
328        assert_eq!(requests_pending("telegram"), 2);
329        sub_requests_pending("telegram", 99); // clamps
330        assert_eq!(requests_pending("telegram"), 0);
331    }
332
333    #[test]
334    fn set_pending_authoritative() {
335        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
336        reset_for_test();
337        set_requests_pending("whatsapp", 7);
338        assert_eq!(requests_pending("whatsapp"), 7);
339        set_requests_pending("whatsapp", 0);
340        assert_eq!(requests_pending("whatsapp"), 0);
341        set_requests_pending("whatsapp", -3); // clamps
342        assert_eq!(requests_pending("whatsapp"), 0);
343    }
344
345    #[test]
346    fn render_zero_when_empty() {
347        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
348        reset_for_test();
349        let mut s = String::new();
350        render(&mut s);
351        assert!(s.contains("pairing_approvals_total{channel=\"\",result=\"\"} 0"));
352        assert!(s.contains("pairing_codes_expired_total 0"));
353        assert!(s.contains("pairing_bootstrap_tokens_issued_total{profile=\"\"} 0"));
354        assert!(s.contains("pairing_requests_pending{channel=\"\"} 0"));
355    }
356
357    #[test]
358    fn render_emits_values() {
359        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
360        reset_for_test();
361        inc_approvals("whatsapp", "ok");
362        add_codes_expired(4);
363        inc_bootstrap_tokens_issued("default");
364        set_requests_pending("telegram", 2);
365        let mut s = String::new();
366        render(&mut s);
367        assert!(s.contains("pairing_approvals_total{channel=\"whatsapp\",result=\"ok\"} 1"));
368        assert!(s.contains("pairing_codes_expired_total 4"));
369        assert!(s.contains("pairing_bootstrap_tokens_issued_total{profile=\"default\"} 1"));
370        assert!(s.contains("pairing_requests_pending{channel=\"telegram\"} 2"));
371    }
372}