1use dashmap::DashMap;
21use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
22use std::sync::LazyLock;
23
24#[derive(Clone, Debug, Hash, Eq, PartialEq)]
25struct ApprovalKey {
26 channel: String,
27 result: String,
28}
29
30static APPROVALS: LazyLock<DashMap<ApprovalKey, AtomicU64>> = LazyLock::new(DashMap::new);
31static CODES_EXPIRED: AtomicU64 = AtomicU64::new(0);
32static BOOTSTRAP_TOKENS_ISSUED: LazyLock<DashMap<String, AtomicU64>> = LazyLock::new(DashMap::new);
33static REQUESTS_PENDING: LazyLock<DashMap<String, AtomicI64>> = LazyLock::new(DashMap::new);
34
35pub fn inc_approvals(channel: &str, result: &str) {
39 APPROVALS
40 .entry(ApprovalKey {
41 channel: channel.to_string(),
42 result: result.to_string(),
43 })
44 .or_insert_with(|| AtomicU64::new(0))
45 .fetch_add(1, Ordering::Relaxed);
46}
47
48pub fn add_codes_expired(n: u64) {
52 CODES_EXPIRED.fetch_add(n, Ordering::Relaxed);
53}
54
55pub fn inc_bootstrap_tokens_issued(profile: &str) {
58 BOOTSTRAP_TOKENS_ISSUED
59 .entry(profile.to_string())
60 .or_insert_with(|| AtomicU64::new(0))
61 .fetch_add(1, Ordering::Relaxed);
62}
63
64pub fn inc_requests_pending(channel: &str) {
66 REQUESTS_PENDING
67 .entry(channel.to_string())
68 .or_insert_with(|| AtomicI64::new(0))
69 .fetch_add(1, Ordering::Relaxed);
70}
71
72pub fn dec_requests_pending(channel: &str) {
78 let entry = REQUESTS_PENDING
79 .entry(channel.to_string())
80 .or_insert_with(|| AtomicI64::new(0));
81 let mut current = entry.load(Ordering::Relaxed);
82 loop {
83 if current <= 0 {
84 entry.store(0, Ordering::Relaxed);
85 return;
86 }
87 match entry.compare_exchange_weak(
88 current,
89 current - 1,
90 Ordering::Relaxed,
91 Ordering::Relaxed,
92 ) {
93 Ok(_) => return,
94 Err(actual) => current = actual,
95 }
96 }
97}
98
99pub fn sub_requests_pending(channel: &str, n: i64) {
102 if n <= 0 {
103 return;
104 }
105 let entry = REQUESTS_PENDING
106 .entry(channel.to_string())
107 .or_insert_with(|| AtomicI64::new(0));
108 let mut current = entry.load(Ordering::Relaxed);
109 loop {
110 let next = (current - n).max(0);
111 match entry.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed) {
112 Ok(_) => return,
113 Err(actual) => current = actual,
114 }
115 }
116}
117
118pub fn set_requests_pending(channel: &str, value: i64) {
123 REQUESTS_PENDING
124 .entry(channel.to_string())
125 .or_insert_with(|| AtomicI64::new(0))
126 .store(value.max(0), Ordering::Relaxed);
127}
128
129pub fn pending_channels() -> Vec<String> {
133 REQUESTS_PENDING.iter().map(|e| e.key().clone()).collect()
134}
135
136pub fn approvals_total(channel: &str, result: &str) -> u64 {
139 APPROVALS
140 .get(&ApprovalKey {
141 channel: channel.to_string(),
142 result: result.to_string(),
143 })
144 .map(|v| v.value().load(Ordering::Relaxed))
145 .unwrap_or(0)
146}
147
148pub fn codes_expired_total() -> u64 {
149 CODES_EXPIRED.load(Ordering::Relaxed)
150}
151
152pub fn bootstrap_tokens_issued_total(profile: &str) -> u64 {
153 BOOTSTRAP_TOKENS_ISSUED
154 .get(profile)
155 .map(|v| v.value().load(Ordering::Relaxed))
156 .unwrap_or(0)
157}
158
159pub fn requests_pending(channel: &str) -> i64 {
160 REQUESTS_PENDING
161 .get(channel)
162 .map(|v| v.value().load(Ordering::Relaxed))
163 .unwrap_or(0)
164}
165
166pub fn reset_for_test() {
170 APPROVALS.clear();
171 CODES_EXPIRED.store(0, Ordering::Relaxed);
172 BOOTSTRAP_TOKENS_ISSUED.clear();
173 REQUESTS_PENDING.clear();
174}
175
176fn escape(s: &str) -> String {
177 let mut out = String::with_capacity(s.len());
178 for c in s.chars() {
179 match c {
180 '\\' => out.push_str("\\\\"),
181 '"' => out.push_str("\\\""),
182 '\n' => out.push_str("\\n"),
183 _ => out.push(c),
184 }
185 }
186 out
187}
188
189pub fn render(out: &mut String) {
192 out.push_str(
193 "# HELP pairing_approvals_total Pairing approval attempts by channel and result.\n",
194 );
195 out.push_str("# TYPE pairing_approvals_total counter\n");
196 if APPROVALS.is_empty() {
197 out.push_str("pairing_approvals_total{channel=\"\",result=\"\"} 0\n");
198 } else {
199 let mut rows: Vec<(ApprovalKey, u64)> = APPROVALS
200 .iter()
201 .map(|e| (e.key().clone(), e.value().load(Ordering::Relaxed)))
202 .collect();
203 rows.sort_by(|a, b| {
204 (a.0.channel.clone(), a.0.result.clone())
205 .cmp(&(b.0.channel.clone(), b.0.result.clone()))
206 });
207 for (k, v) in rows {
208 out.push_str(&format!(
209 "pairing_approvals_total{{channel=\"{}\",result=\"{}\"}} {}\n",
210 escape(&k.channel),
211 escape(&k.result),
212 v
213 ));
214 }
215 }
216
217 out.push_str("# HELP pairing_codes_expired_total Pairing setup codes pruned past TTL or rejected as expired on approve.\n");
218 out.push_str("# TYPE pairing_codes_expired_total counter\n");
219 out.push_str(&format!(
220 "pairing_codes_expired_total {}\n",
221 CODES_EXPIRED.load(Ordering::Relaxed)
222 ));
223
224 out.push_str(
225 "# HELP pairing_bootstrap_tokens_issued_total Bootstrap tokens minted by profile.\n",
226 );
227 out.push_str("# TYPE pairing_bootstrap_tokens_issued_total counter\n");
228 if BOOTSTRAP_TOKENS_ISSUED.is_empty() {
229 out.push_str("pairing_bootstrap_tokens_issued_total{profile=\"\"} 0\n");
230 } else {
231 let mut rows: Vec<(String, u64)> = BOOTSTRAP_TOKENS_ISSUED
232 .iter()
233 .map(|e| (e.key().clone(), e.value().load(Ordering::Relaxed)))
234 .collect();
235 rows.sort_by(|a, b| a.0.cmp(&b.0));
236 for (profile, v) in rows {
237 out.push_str(&format!(
238 "pairing_bootstrap_tokens_issued_total{{profile=\"{}\"}} {}\n",
239 escape(&profile),
240 v
241 ));
242 }
243 }
244
245 out.push_str(
246 "# HELP pairing_requests_pending Pending pairing requests by channel (push-tracked).\n",
247 );
248 out.push_str("# TYPE pairing_requests_pending gauge\n");
249 if REQUESTS_PENDING.is_empty() {
250 out.push_str("pairing_requests_pending{channel=\"\"} 0\n");
251 } else {
252 let mut rows: Vec<(String, i64)> = REQUESTS_PENDING
253 .iter()
254 .map(|e| (e.key().clone(), e.value().load(Ordering::Relaxed)))
255 .collect();
256 rows.sort_by(|a, b| a.0.cmp(&b.0));
257 for (channel, v) in rows {
258 out.push_str(&format!(
259 "pairing_requests_pending{{channel=\"{}\"}} {}\n",
260 escape(&channel),
261 v
262 ));
263 }
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use std::sync::Mutex;
271
272 static TEST_LOCK: Mutex<()> = Mutex::new(());
273
274 #[test]
275 fn approvals_inc_and_read() {
276 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
277 reset_for_test();
278 inc_approvals("whatsapp", "ok");
279 inc_approvals("whatsapp", "ok");
280 inc_approvals("telegram", "expired");
281 assert_eq!(approvals_total("whatsapp", "ok"), 2);
282 assert_eq!(approvals_total("telegram", "expired"), 1);
283 assert_eq!(approvals_total("whatsapp", "expired"), 0);
284 }
285
286 #[test]
287 fn codes_expired_accumulates() {
288 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
289 reset_for_test();
290 add_codes_expired(3);
291 add_codes_expired(2);
292 assert_eq!(codes_expired_total(), 5);
293 }
294
295 #[test]
296 fn bootstrap_tokens_per_profile() {
297 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
298 reset_for_test();
299 inc_bootstrap_tokens_issued("default");
300 inc_bootstrap_tokens_issued("staging");
301 inc_bootstrap_tokens_issued("default");
302 assert_eq!(bootstrap_tokens_issued_total("default"), 2);
303 assert_eq!(bootstrap_tokens_issued_total("staging"), 1);
304 }
305
306 #[test]
307 fn requests_pending_inc_dec_clamp() {
308 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
309 reset_for_test();
310 inc_requests_pending("whatsapp");
311 inc_requests_pending("whatsapp");
312 assert_eq!(requests_pending("whatsapp"), 2);
313 dec_requests_pending("whatsapp");
314 assert_eq!(requests_pending("whatsapp"), 1);
315 dec_requests_pending("whatsapp");
316 dec_requests_pending("whatsapp"); assert_eq!(requests_pending("whatsapp"), 0);
318 }
319
320 #[test]
321 fn requests_pending_sub_clamp() {
322 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
323 reset_for_test();
324 for _ in 0..5 {
325 inc_requests_pending("telegram");
326 }
327 sub_requests_pending("telegram", 3);
328 assert_eq!(requests_pending("telegram"), 2);
329 sub_requests_pending("telegram", 99); assert_eq!(requests_pending("telegram"), 0);
331 }
332
333 #[test]
334 fn set_pending_authoritative() {
335 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
336 reset_for_test();
337 set_requests_pending("whatsapp", 7);
338 assert_eq!(requests_pending("whatsapp"), 7);
339 set_requests_pending("whatsapp", 0);
340 assert_eq!(requests_pending("whatsapp"), 0);
341 set_requests_pending("whatsapp", -3); assert_eq!(requests_pending("whatsapp"), 0);
343 }
344
345 #[test]
346 fn render_zero_when_empty() {
347 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
348 reset_for_test();
349 let mut s = String::new();
350 render(&mut s);
351 assert!(s.contains("pairing_approvals_total{channel=\"\",result=\"\"} 0"));
352 assert!(s.contains("pairing_codes_expired_total 0"));
353 assert!(s.contains("pairing_bootstrap_tokens_issued_total{profile=\"\"} 0"));
354 assert!(s.contains("pairing_requests_pending{channel=\"\"} 0"));
355 }
356
357 #[test]
358 fn render_emits_values() {
359 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
360 reset_for_test();
361 inc_approvals("whatsapp", "ok");
362 add_codes_expired(4);
363 inc_bootstrap_tokens_issued("default");
364 set_requests_pending("telegram", 2);
365 let mut s = String::new();
366 render(&mut s);
367 assert!(s.contains("pairing_approvals_total{channel=\"whatsapp\",result=\"ok\"} 1"));
368 assert!(s.contains("pairing_codes_expired_total 4"));
369 assert!(s.contains("pairing_bootstrap_tokens_issued_total{profile=\"default\"} 1"));
370 assert!(s.contains("pairing_requests_pending{channel=\"telegram\"} 2"));
371 }
372}