near_async/instrumentation/
queue.rs1use core::str;
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU64;
5
6use parking_lot::RwLock;
7
8use crate::instrumentation::metrics::QUEUE_PENDING_MESSAGES;
9use near_o11y::metrics::prometheus;
10
11pub struct InstrumentedQueue {
13 pending: RwLock<HashMap<String, AtomicU64>>,
14 pending_messages_gauge: prometheus::core::GenericGauge<prometheus::core::AtomicI64>,
15}
16
17impl InstrumentedQueue {
18 pub fn new(queue_name: &str) -> Arc<Self> {
19 let queue = Arc::new(Self {
20 pending: RwLock::new(HashMap::new()),
21 pending_messages_gauge: QUEUE_PENDING_MESSAGES.with_label_values(&[queue_name]),
22 });
23 queue
24 }
25
26 pub fn enqueue(&self, message_type: &str) {
27 {
28 let pending = self.pending.read();
30 if let Some(counter) = pending.get(message_type) {
31 counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
32 self.pending_messages_gauge.inc();
33 return;
34 }
35 }
36 let mut pending = self.pending.write();
38 let counter = pending.entry(message_type.to_string()).or_insert_with(|| AtomicU64::new(0));
39 counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
40 self.pending_messages_gauge.inc();
41 }
42
43 pub fn dequeue(&self, message_type: &str) {
44 let pending = self.pending.read();
45 if let Some(counter) = pending.get(message_type) {
46 counter.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
47 self.pending_messages_gauge.dec();
48 }
49 }
50
51 pub fn get_pending_events(&self) -> HashMap<String, u64> {
52 let pending = self.pending.read();
53 pending
54 .iter()
55 .map(|(k, v)| (k.clone(), v.load(std::sync::atomic::Ordering::Relaxed)))
56 .collect()
57 }
58}